use std::{ collections::{BTreeSet, HashSet}, hash::Hash, }; use egui_nav::ReturnType; use egui_virtual_list::VirtualList; use enostr::{NoteId, RelayPool}; use hashbrown::{hash_map::RawEntryMut, HashMap}; use nostrdb::{Filter, Ndb, Note, NoteKey, NoteReplyBuf, Transaction}; use notedeck::{NoteCache, NoteRef, UnknownIds}; use crate::{ actionbar::{process_thread_notes, NewThreadNotes}, multi_subscriber::ThreadSubs, timeline::MergeKind, }; use super::ThreadSelection; pub struct ThreadNode { pub replies: HybridSet, pub prev: ParentState, pub have_all_ancestors: bool, pub list: VirtualList, } #[derive(Clone)] pub enum ParentState { Unknown, None, Parent(NoteId), } /// Affords: /// - O(1) contains /// - O(log n) sorted insertion pub struct HybridSet { reversed: bool, lookup: HashSet, // fast deduplication ordered: BTreeSet, // sorted iteration } impl Default for HybridSet { fn default() -> Self { Self { reversed: Default::default(), lookup: Default::default(), ordered: Default::default(), } } } pub enum InsertionResponse { AlreadyExists, Merged(MergeKind), } impl HybridSet { pub fn insert(&mut self, val: T) -> InsertionResponse { if !self.lookup.insert(val) { return InsertionResponse::AlreadyExists; } let front_insertion = match self.ordered.iter().next() { Some(first) => (val >= *first) == self.reversed, None => true, }; self.ordered.insert(val); // O(log n) InsertionResponse::Merged(if front_insertion { MergeKind::FrontInsert } else { MergeKind::Spliced }) } } impl HybridSet { pub fn contains(&self, val: &T) -> bool { self.lookup.contains(val) // O(1) } } impl HybridSet { pub fn iter(&self) -> HybridIter<'_, T> { HybridIter { inner: self.ordered.iter(), reversed: self.reversed, } } pub fn new(reversed: bool) -> Self { Self { reversed, ..Default::default() } } } impl<'a, T> IntoIterator for &'a HybridSet { type Item = &'a T; type IntoIter = HybridIter<'a, T>; fn into_iter(self) -> Self::IntoIter { self.iter() } } pub struct HybridIter<'a, T> { inner: std::collections::btree_set::Iter<'a, T>, reversed: bool, } impl<'a, T> Iterator for HybridIter<'a, T> { type Item = &'a T; fn next(&mut self) -> Option { if self.reversed { self.inner.next_back() } else { self.inner.next() } } } impl ThreadNode { pub fn new(parent: ParentState) -> Self { Self { replies: HybridSet::new(true), prev: parent, have_all_ancestors: false, list: VirtualList::new(), } } } #[derive(Default)] pub struct Threads { pub threads: HashMap, pub subs: ThreadSubs, pub seen_flags: NoteSeenFlags, } impl Threads { /// Opening a thread. /// Similar to [[super::cache::TimelineCache::open]] pub fn open( &mut self, ndb: &mut Ndb, txn: &Transaction, pool: &mut RelayPool, thread: &ThreadSelection, new_scope: bool, col: usize, ) -> Option { tracing::info!("Opening thread: {:?}", thread); let local_sub_filter = if let Some(selected) = &thread.selected_note { vec![direct_replies_filter_non_root( selected.bytes(), thread.root_id.bytes(), )] } else { vec![direct_replies_filter_root(thread.root_id.bytes())] }; let selected_note_id = thread.selected_or_root(); self.seen_flags.mark_seen(selected_note_id); let filter = match self.threads.raw_entry_mut().from_key(&selected_note_id) { RawEntryMut::Occupied(_entry) => { // TODO(kernelkind): reenable this once the panic is fixed // // let node = entry.into_mut(); // if let Some(first) = node.replies.first() { // &filter::make_filters_since(&local_sub_filter, first.created_at + 1) // } else { // &local_sub_filter // } &local_sub_filter } RawEntryMut::Vacant(entry) => { let id = NoteId::new(*selected_note_id); let node = ThreadNode::new(ParentState::Unknown); entry.insert(id, node); &local_sub_filter } }; let new_notes = ndb.query(txn, filter, 500).ok().map(|r| { r.into_iter() .map(NoteRef::from_query_result) .collect::>() }); self.subs .subscribe(ndb, pool, col, thread, local_sub_filter, new_scope, || { replies_filter_remote(thread) }); new_notes.map(|notes| NewThreadNotes { selected_note_id: NoteId::new(*selected_note_id), notes: notes.into_iter().map(|f| f.key).collect(), }) } pub fn close( &mut self, ndb: &mut Ndb, pool: &mut RelayPool, thread: &ThreadSelection, return_type: ReturnType, id: usize, ) { tracing::info!("Closing thread: {:?}", thread); self.subs.unsubscribe(ndb, pool, id, thread, return_type); } /// Responsible for making sure the chain and the direct replies are up to date pub fn update( &mut self, selected: &Note<'_>, note_cache: &mut NoteCache, ndb: &Ndb, txn: &Transaction, unknown_ids: &mut UnknownIds, col: usize, ) { let Some(selected_key) = selected.key() else { tracing::error!("Selected note did not have a key"); return; }; let reply = note_cache .cached_note_or_insert_mut(selected_key, selected) .reply; self.fill_reply_chain_recursive(selected, &reply, note_cache, ndb, txn, unknown_ids); let node = self .threads .get_mut(&selected.id()) .expect("should be guarenteed to exist from `Self::fill_reply_chain_recursive`"); let Some(sub) = self.subs.get_local(col) else { tracing::error!("Was expecting to find local sub"); return; }; let keys = ndb.poll_for_notes(sub.sub, 10); if keys.is_empty() { return; } tracing::info!("Got {} new notes", keys.len()); process_thread_notes( &keys, node, &mut self.seen_flags, ndb, txn, unknown_ids, note_cache, ); } fn fill_reply_chain_recursive( &mut self, cur_note: &Note<'_>, cur_reply: &NoteReplyBuf, note_cache: &mut NoteCache, ndb: &Ndb, txn: &Transaction, unknown_ids: &mut UnknownIds, ) -> bool { let (unknown_parent_state, mut have_all_ancestors) = self .threads .get(&cur_note.id()) .map(|t| (matches!(t.prev, ParentState::Unknown), t.have_all_ancestors)) .unwrap_or((true, false)); if have_all_ancestors { return true; } let mut new_parent = None; let note_reply = cur_reply.borrow(cur_note.tags()); let next_link = 's: { let Some(parent) = note_reply.reply() else { break 's NextLink::None; }; if unknown_parent_state { new_parent = Some(ParentState::Parent(NoteId::new(*parent.id))); } let Ok(reply_note) = ndb.get_note_by_id(txn, parent.id) else { break 's NextLink::Unknown(parent.id); }; let Some(notekey) = reply_note.key() else { break 's NextLink::Unknown(parent.id); }; NextLink::Next(reply_note, notekey) }; match next_link { NextLink::Unknown(parent) => { unknown_ids.add_note_id_if_missing(ndb, txn, parent); } NextLink::Next(next_note, note_key) => { UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &next_note); let cached_note = note_cache.cached_note_or_insert_mut(note_key, &next_note); let next_reply = cached_note.reply; if self.fill_reply_chain_recursive( &next_note, &next_reply, note_cache, ndb, txn, unknown_ids, ) { have_all_ancestors = true; } if !self.seen_flags.contains(next_note.id()) { self.seen_flags.mark_replies( next_note.id(), selected_has_at_least_n_replies(ndb, txn, None, next_note.id(), 2), ); } } NextLink::None => { have_all_ancestors = true; new_parent = Some(ParentState::None); } } match self.threads.raw_entry_mut().from_key(&cur_note.id()) { RawEntryMut::Occupied(entry) => { let node = entry.into_mut(); if let Some(parent) = new_parent { node.prev = parent; } if have_all_ancestors { node.have_all_ancestors = true; } } RawEntryMut::Vacant(entry) => { let id = NoteId::new(*cur_note.id()); let parent = new_parent.unwrap_or(ParentState::Unknown); let (_, res) = entry.insert(id, ThreadNode::new(parent)); if have_all_ancestors { res.have_all_ancestors = true; } } } have_all_ancestors } } enum NextLink<'a> { Unknown(&'a [u8; 32]), Next(Note<'a>, NoteKey), None, } pub fn selected_has_at_least_n_replies( ndb: &Ndb, txn: &Transaction, selected: Option<&[u8; 32]>, root: &[u8; 32], n: u8, ) -> bool { let filter = if let Some(selected) = selected { &vec![direct_replies_filter_non_root(selected, root)] } else { &vec![direct_replies_filter_root(root)] }; let Ok(res) = ndb.query(txn, filter, n as i32) else { return false; }; res.len() >= n.into() } fn direct_replies_filter_non_root( selected_note_id: &[u8; 32], root_id: &[u8; 32], ) -> nostrdb::Filter { let tmp_selected = *selected_note_id; nostrdb::Filter::new() .kinds([1]) .custom(move |note: nostrdb::Note<'_>| { let reply = nostrdb::NoteReply::new(note.tags()); if reply.is_reply_to_root() { return false; } reply.reply().is_some_and(|r| r.id == &tmp_selected) }) .event(root_id) .build() } /// Custom filter requirements: /// - Do NOT capture references (e.g. `*root_id`) inside the closure /// - Instead, copy values outside and capture them with `move` /// /// Incorrect: /// .custom(|_| { *root_id }) // ❌ /// Also Incorrect: /// .custom(move |_| { *root_id }) // ❌ /// Correct: /// let tmp = *root_id; /// .custom(move |_| { tmp }) // ✅ fn direct_replies_filter_root(root_id: &[u8; 32]) -> nostrdb::Filter { let moved_root_id = *root_id; nostrdb::Filter::new() .kinds([1]) .custom(move |note: nostrdb::Note<'_>| { nostrdb::NoteReply::new(note.tags()) .reply_to_root() .is_some_and(|r| r.id == &moved_root_id) }) .event(root_id) .build() } fn replies_filter_remote(selection: &ThreadSelection) -> Vec { vec![ nostrdb::Filter::new() .kinds([1]) .event(selection.root_id.bytes()) .build(), nostrdb::Filter::new() .ids([selection.root_id.bytes()]) .limit(1) .build(), ] } /// Represents indicators that there is more content in the note to view #[derive(Default)] pub struct NoteSeenFlags { // true indicates the note has replies AND it has not been read pub flags: HashMap, } impl NoteSeenFlags { pub fn mark_seen(&mut self, note_id: &[u8; 32]) { self.flags.insert(NoteId::new(*note_id), false); } pub fn mark_replies(&mut self, note_id: &[u8; 32], has_replies: bool) { self.flags.insert(NoteId::new(*note_id), has_replies); } pub fn get(&self, note_id: &[u8; 32]) -> Option<&bool> { self.flags.get(¬e_id) } pub fn contains(&self, note_id: &[u8; 32]) -> bool { self.flags.contains_key(¬e_id) } }