add threads impl

Signed-off-by: kernelkind <kernelkind@gmail.com>
This commit is contained in:
kernelkind
2025-06-16 17:38:01 -04:00
parent 3c31e1a651
commit cdcca0ba35
3 changed files with 636 additions and 2 deletions

View File

@@ -2,10 +2,15 @@ use crate::{
column::Columns,
nav::{RouterAction, RouterType},
route::Route,
timeline::{ThreadSelection, TimelineCache, TimelineKind},
timeline::{
thread::{
selected_has_at_least_n_replies, InsertionResponse, NoteSeenFlags, ThreadNode, Threads,
},
ThreadSelection, TimelineCache, TimelineKind,
},
};
use enostr::{Pubkey, RelayPool};
use enostr::{NoteId, Pubkey, RelayPool};
use nostrdb::{Ndb, NoteKey, Transaction};
use notedeck::{
get_wallet_for_mut, note::ZapTargetAmount, Accounts, GlobalWallet, Images, NoteAction,
@@ -252,3 +257,103 @@ impl NewNotes {
}
}
}
pub struct NewThreadNotes {
pub selected_note_id: NoteId,
pub notes: Vec<NoteKey>,
}
impl NewThreadNotes {
pub fn process(
&self,
threads: &mut Threads,
ndb: &Ndb,
txn: &Transaction,
unknown_ids: &mut UnknownIds,
note_cache: &mut NoteCache,
) {
let Some(node) = threads.threads.get_mut(&self.selected_note_id.bytes()) else {
tracing::error!("Could not find thread node for {:?}", self.selected_note_id);
return;
};
process_thread_notes(
&self.notes,
node,
&mut threads.seen_flags,
ndb,
txn,
unknown_ids,
note_cache,
);
}
}
pub fn process_thread_notes(
notes: &Vec<NoteKey>,
thread: &mut ThreadNode,
seen_flags: &mut NoteSeenFlags,
ndb: &Ndb,
txn: &Transaction,
unknown_ids: &mut UnknownIds,
note_cache: &mut NoteCache,
) {
if notes.is_empty() {
return;
}
let mut has_spliced_resp = false;
let mut num_new_notes = 0;
for key in notes {
let note = if let Ok(note) = ndb.get_note_by_key(txn, *key) {
note
} else {
tracing::error!("hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", key);
continue;
};
// Ensure that unknown ids are captured when inserting notes
UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &note);
let created_at = note.created_at();
let note_ref = notedeck::NoteRef {
key: *key,
created_at,
};
if thread.replies.contains(&note_ref) {
continue;
}
let insertion_resp = thread.replies.insert(note_ref);
if let InsertionResponse::Merged(crate::timeline::MergeKind::Spliced) = insertion_resp {
has_spliced_resp = true;
}
if matches!(insertion_resp, InsertionResponse::Merged(_)) {
num_new_notes += 1;
}
if !seen_flags.contains(note.id()) {
let cached_note = note_cache.cached_note_or_insert_mut(*key, &note);
let note_reply = cached_note.reply.borrow(note.tags());
let has_reply = if let Some(root) = note_reply.root() {
selected_has_at_least_n_replies(ndb, txn, Some(note.id()), root.id, 1)
} else {
selected_has_at_least_n_replies(ndb, txn, None, note.id(), 1)
};
seen_flags.mark_replies(note.id(), has_reply);
}
}
if has_spliced_resp {
tracing::debug!(
"spliced when inserting {} new notes, resetting virtual list",
num_new_notes
);
thread.list.reset();
}
}

View File

@@ -21,6 +21,7 @@ use tracing::{debug, error, info, warn};
pub mod cache;
pub mod kind;
pub mod route;
pub mod thread;
pub use cache::TimelineCache;
pub use kind::{ColumnTitle, PubkeySource, ThreadSelection, TimelineKind};

View File

@@ -0,0 +1,528 @@
use std::{
collections::{BTreeSet, HashSet},
hash::Hash,
};
use egui_nav::ReturnType;
use egui_virtual_list::VirtualList;
use enostr::{NoteId, RelayPool};
use hashbrown::{hash_map::RawEntryMut, HashMap};
use nostrdb::{Filter, Ndb, Note, NoteKey, NoteReplyBuf, Transaction};
use notedeck::{NoteCache, NoteRef, UnknownIds};
use crate::{
actionbar::{process_thread_notes, NewThreadNotes},
multi_subscriber::ThreadSubs,
timeline::MergeKind,
};
use super::ThreadSelection;
pub struct ThreadNode {
pub replies: HybridSet<NoteRef>,
pub prev: ParentState,
pub have_all_ancestors: bool,
pub list: VirtualList,
}
#[derive(Clone)]
pub enum ParentState {
Unknown,
None,
Parent(NoteId),
}
/// Affords:
/// - O(1) contains
/// - O(log n) sorted insertion
pub struct HybridSet<T> {
reversed: bool,
lookup: HashSet<T>, // fast deduplication
ordered: BTreeSet<T>, // sorted iteration
}
impl<T> Default for HybridSet<T> {
fn default() -> Self {
Self {
reversed: Default::default(),
lookup: Default::default(),
ordered: Default::default(),
}
}
}
pub enum InsertionResponse {
AlreadyExists,
Merged(MergeKind),
}
impl<T: Copy + Ord + Eq + Hash> HybridSet<T> {
pub fn insert(&mut self, val: T) -> InsertionResponse {
if !self.lookup.insert(val) {
return InsertionResponse::AlreadyExists;
}
let front_insertion = match self.ordered.iter().next() {
Some(first) => (val >= *first) == self.reversed,
None => true,
};
self.ordered.insert(val); // O(log n)
InsertionResponse::Merged(if front_insertion {
MergeKind::FrontInsert
} else {
MergeKind::Spliced
})
}
}
impl<T: Eq + Hash> HybridSet<T> {
pub fn contains(&self, val: &T) -> bool {
self.lookup.contains(val) // O(1)
}
}
impl<T> HybridSet<T> {
pub fn iter(&self) -> HybridIter<'_, T> {
HybridIter {
inner: self.ordered.iter(),
reversed: self.reversed,
}
}
pub fn new(reversed: bool) -> Self {
Self {
reversed,
..Default::default()
}
}
}
impl<'a, T> IntoIterator for &'a HybridSet<T> {
type Item = &'a T;
type IntoIter = HybridIter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
pub struct HybridIter<'a, T> {
inner: std::collections::btree_set::Iter<'a, T>,
reversed: bool,
}
impl<'a, T> Iterator for HybridIter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if self.reversed {
self.inner.next_back()
} else {
self.inner.next()
}
}
}
impl ThreadNode {
pub fn new(parent: ParentState) -> Self {
Self {
replies: HybridSet::new(true),
prev: parent,
have_all_ancestors: false,
list: VirtualList::new(),
}
}
}
#[derive(Default)]
pub struct Threads {
pub threads: HashMap<NoteId, ThreadNode>,
pub subs: ThreadSubs,
pub seen_flags: NoteSeenFlags,
}
impl Threads {
/// Opening a thread.
/// Similar to [[super::cache::TimelineCache::open]]
pub fn open(
&mut self,
ndb: &mut Ndb,
txn: &Transaction,
pool: &mut RelayPool,
thread: &ThreadSelection,
new_scope: bool,
col: usize,
) -> Option<NewThreadNotes> {
tracing::info!("Opening thread: {:?}", thread);
let local_sub_filter = if let Some(selected) = &thread.selected_note {
vec![direct_replies_filter_non_root(
selected.bytes(),
thread.root_id.bytes(),
)]
} else {
vec![direct_replies_filter_root(thread.root_id.bytes())]
};
let selected_note_id = thread.selected_or_root();
self.seen_flags.mark_seen(selected_note_id);
let filter = match self.threads.raw_entry_mut().from_key(&selected_note_id) {
RawEntryMut::Occupied(_entry) => {
// TODO(kernelkind): reenable this once the panic is fixed
//
// let node = entry.into_mut();
// if let Some(first) = node.replies.first() {
// &filter::make_filters_since(&local_sub_filter, first.created_at + 1)
// } else {
// &local_sub_filter
// }
&local_sub_filter
}
RawEntryMut::Vacant(entry) => {
let id = NoteId::new(*selected_note_id);
let node = ThreadNode::new(ParentState::Unknown);
entry.insert(id, node);
&local_sub_filter
}
};
let new_notes = ndb.query(txn, filter, 500).ok().map(|r| {
r.into_iter()
.map(NoteRef::from_query_result)
.collect::<Vec<_>>()
});
self.subs
.subscribe(ndb, pool, col, thread, local_sub_filter, new_scope, || {
replies_filter_remote(thread)
});
new_notes.map(|notes| NewThreadNotes {
selected_note_id: NoteId::new(*selected_note_id),
notes: notes.into_iter().map(|f| f.key).collect(),
})
}
pub fn close(
&mut self,
ndb: &mut Ndb,
pool: &mut RelayPool,
thread: &ThreadSelection,
return_type: ReturnType,
id: usize,
) {
tracing::info!("Closing thread: {:?}", thread);
self.subs.unsubscribe(ndb, pool, id, thread, return_type);
}
/// Responsible for making sure the chain and the direct replies are up to date
pub fn update(
&mut self,
selected: &Note<'_>,
note_cache: &mut NoteCache,
ndb: &Ndb,
txn: &Transaction,
unknown_ids: &mut UnknownIds,
col: usize,
) {
let Some(selected_key) = selected.key() else {
tracing::error!("Selected note did not have a key");
return;
};
let reply = note_cache
.cached_note_or_insert_mut(selected_key, selected)
.reply;
self.fill_reply_chain_recursive(selected, &reply, note_cache, ndb, txn, unknown_ids);
let node = self
.threads
.get_mut(&selected.id())
.expect("should be guarenteed to exist from `Self::fill_reply_chain_recursive`");
let Some(sub) = self.subs.get_local(col) else {
tracing::error!("Was expecting to find local sub");
return;
};
let keys = ndb.poll_for_notes(sub.sub, 10);
if keys.is_empty() {
return;
}
tracing::info!("Got {} new notes", keys.len());
process_thread_notes(
&keys,
node,
&mut self.seen_flags,
ndb,
txn,
unknown_ids,
note_cache,
);
}
fn fill_reply_chain_recursive(
&mut self,
cur_note: &Note<'_>,
cur_reply: &NoteReplyBuf,
note_cache: &mut NoteCache,
ndb: &Ndb,
txn: &Transaction,
unknown_ids: &mut UnknownIds,
) -> bool {
let (unknown_parent_state, mut have_all_ancestors) = self
.threads
.get(&cur_note.id())
.map(|t| (matches!(t.prev, ParentState::Unknown), t.have_all_ancestors))
.unwrap_or((true, false));
if have_all_ancestors {
return true;
}
let mut new_parent = None;
let note_reply = cur_reply.borrow(cur_note.tags());
let next_link = 's: {
let Some(parent) = note_reply.reply() else {
break 's NextLink::None;
};
if unknown_parent_state {
new_parent = Some(ParentState::Parent(NoteId::new(*parent.id)));
}
let Ok(reply_note) = ndb.get_note_by_id(txn, parent.id) else {
break 's NextLink::Unknown(parent.id);
};
let Some(notekey) = reply_note.key() else {
break 's NextLink::Unknown(parent.id);
};
NextLink::Next(reply_note, notekey)
};
match next_link {
NextLink::Unknown(parent) => {
unknown_ids.add_note_id_if_missing(ndb, txn, parent);
}
NextLink::Next(next_note, note_key) => {
UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &next_note);
let cached_note = note_cache.cached_note_or_insert_mut(note_key, &next_note);
let next_reply = cached_note.reply;
if self.fill_reply_chain_recursive(
&next_note,
&next_reply,
note_cache,
ndb,
txn,
unknown_ids,
) {
have_all_ancestors = true;
}
if !self.seen_flags.contains(next_note.id()) {
self.seen_flags.mark_replies(
next_note.id(),
selected_has_at_least_n_replies(ndb, txn, None, next_note.id(), 2),
);
}
}
NextLink::None => {
have_all_ancestors = true;
new_parent = Some(ParentState::None);
}
}
match self.threads.raw_entry_mut().from_key(&cur_note.id()) {
RawEntryMut::Occupied(entry) => {
let node = entry.into_mut();
if let Some(parent) = new_parent {
node.prev = parent;
}
if have_all_ancestors {
node.have_all_ancestors = true;
}
}
RawEntryMut::Vacant(entry) => {
let id = NoteId::new(*cur_note.id());
let parent = new_parent.unwrap_or(ParentState::Unknown);
let (_, res) = entry.insert(id, ThreadNode::new(parent));
if have_all_ancestors {
res.have_all_ancestors = true;
}
}
}
have_all_ancestors
}
}
enum NextLink<'a> {
Unknown(&'a [u8; 32]),
Next(Note<'a>, NoteKey),
None,
}
pub fn selected_has_at_least_n_replies(
ndb: &Ndb,
txn: &Transaction,
selected: Option<&[u8; 32]>,
root: &[u8; 32],
n: u8,
) -> bool {
let filter = if let Some(selected) = selected {
&vec![direct_replies_filter_non_root(selected, root)]
} else {
&vec![direct_replies_filter_root(root)]
};
let Ok(res) = ndb.query(txn, filter, n as i32) else {
return false;
};
res.len() >= n.into()
}
fn direct_replies_filter_non_root(
selected_note_id: &[u8; 32],
root_id: &[u8; 32],
) -> nostrdb::Filter {
let tmp_selected = *selected_note_id;
nostrdb::Filter::new()
.kinds([1])
.custom(move |n: nostrdb::Note<'_>| {
for tag in n.tags() {
if tag.count() < 4 {
continue;
}
let Some("e") = tag.get_str(0) else {
continue;
};
let Some(tagged_id) = tag.get_id(1) else {
continue;
};
if *tagged_id != tmp_selected {
// NOTE: if these aren't dereferenced a segfault occurs...
continue;
}
if let Some(data) = tag.get_str(3) {
if data == "reply" {
return true;
}
}
}
false
})
.event(root_id)
.build()
}
/// Custom filter requirements:
/// - Do NOT capture references (e.g. `*root_id`) inside the closure
/// - Instead, copy values outside and capture them with `move`
///
/// Incorrect:
/// .custom(|_| { *root_id }) // ❌
/// Also Incorrect:
/// .custom(move |_| { *root_id }) // ❌
/// Correct:
/// let tmp = *root_id;
/// .custom(move |_| { tmp }) // ✅
fn direct_replies_filter_root(root_id: &[u8; 32]) -> nostrdb::Filter {
let tmp_root = *root_id;
nostrdb::Filter::new()
.kinds([1])
.custom(move |n: nostrdb::Note<'_>| {
let mut contains_root = false;
for tag in n.tags() {
if tag.count() < 4 {
continue;
}
let Some("e") = tag.get_str(0) else {
continue;
};
if let Some(s) = tag.get_str(3) {
if s == "reply" {
return false;
}
}
let Some(tagged_id) = tag.get_id(1) else {
continue;
};
if *tagged_id != tmp_root {
continue;
}
if let Some(s) = tag.get_str(3) {
if s == "root" {
contains_root = true;
}
}
}
contains_root
})
.event(root_id)
.build()
}
fn replies_filter_remote(selection: &ThreadSelection) -> Vec<Filter> {
vec![
nostrdb::Filter::new()
.kinds([1])
.event(selection.root_id.bytes())
.build(),
nostrdb::Filter::new()
.ids([selection.root_id.bytes()])
.limit(1)
.build(),
]
}
/// Represents indicators that there is more content in the note to view
#[derive(Default)]
pub struct NoteSeenFlags {
// true indicates the note has replies AND it has not been read
pub flags: HashMap<NoteId, bool>,
}
impl NoteSeenFlags {
pub fn mark_seen(&mut self, note_id: &[u8; 32]) {
self.flags.insert(NoteId::new(*note_id), false);
}
pub fn mark_replies(&mut self, note_id: &[u8; 32], has_replies: bool) {
self.flags.insert(NoteId::new(*note_id), has_replies);
}
pub fn get(&self, note_id: &[u8; 32]) -> Option<&bool> {
self.flags.get(&note_id)
}
pub fn contains(&self, note_id: &[u8; 32]) -> bool {
self.flags.contains_key(&note_id)
}
}