From d52c50acc2d55119da635ad7fb649eae64050e2d Mon Sep 17 00:00:00 2001 From: kernelkind Date: Wed, 18 Sep 2024 13:05:23 -0400 Subject: [PATCH 1/2] multi subscriber impl Signed-off-by: kernelkind --- src/lib.rs | 1 + src/multi_subscriber.rs | 107 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+) create mode 100644 src/multi_subscriber.rs diff --git a/src/lib.rs b/src/lib.rs index 2362a9df..fa9c8a00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ mod key_parsing; mod key_storage; pub mod login_manager; mod macos_key_storage; +mod multi_subscriber; mod nav; mod note; mod notecache; diff --git a/src/multi_subscriber.rs b/src/multi_subscriber.rs new file mode 100644 index 00000000..8379e44d --- /dev/null +++ b/src/multi_subscriber.rs @@ -0,0 +1,107 @@ +use enostr::{Filter, RelayPool}; +use nostrdb::Ndb; +use tracing::{error, info}; +use uuid::Uuid; + +use crate::filter::UnifiedSubscription; + +pub struct MultiSubscriber { + filters: Vec, + sub: Option, + subscribers: u32, +} + +impl MultiSubscriber { + pub fn new(filters: Vec) -> Self { + Self { + filters, + sub: None, + subscribers: 0, + } + } + + fn real_subscribe( + ndb: &Ndb, + pool: &mut RelayPool, + filters: Vec, + ) -> Option { + let subid = Uuid::new_v4().to_string(); + let sub = ndb.subscribe(&filters).ok()?; + + pool.subscribe(subid.clone(), filters); + + Some(UnifiedSubscription { + local: sub, + remote: subid, + }) + } + + pub fn unsubscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) { + if self.subscribers == 0 { + error!("No subscribers to unsubscribe from"); + return; + } + + self.subscribers -= 1; + if self.subscribers == 0 { + let sub = match self.sub { + Some(ref sub) => sub, + None => { + error!("No remote subscription to unsubscribe from"); + return; + } + }; + let local_sub = &sub.local; + if let Err(e) = ndb.unsubscribe(*local_sub) { + error!( + "failed to unsubscribe from object: {e}, subid:{}, {} active subscriptions", + local_sub.id(), + ndb.subscription_count() + ); + } else { + info!( + "Unsubscribed from object subid:{}. {} active subscriptions", + local_sub.id(), + ndb.subscription_count() + ); + } + + // unsub from remote + pool.unsubscribe(sub.remote.clone()); + self.sub = None; + } else { + info!( + "Locally unsubscribing. {} active ndb subscriptions. {} active subscriptions for this object", + ndb.subscription_count(), + self.subscribers, + ); + } + } + + pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) { + self.subscribers += 1; + if self.subscribers == 1 { + if self.sub.is_some() { + error!("Object is first subscriber, but it already had remote subscription"); + return; + } + + self.sub = Self::real_subscribe(ndb, pool, self.filters.clone()); + info!( + "Remotely subscribing to object. {} total active subscriptions, {} on this object", + ndb.subscription_count(), + self.subscribers, + ); + + if self.sub.is_none() { + error!("Error subscribing remotely to object"); + } + } else { + info!( + "Locally subscribing. {} total active subscriptions, {} for this object", + ndb.subscription_count(), + self.subscribers, + ) + } + } +} From 814736f9b3306be7d379ca7229a44f930397a1ee Mon Sep 17 00:00:00 2001 From: kernelkind Date: Wed, 18 Sep 2024 13:31:24 -0400 Subject: [PATCH 2/2] implement multi_subscriber for Thread Signed-off-by: kernelkind --- src/actionbar.rs | 46 +++---------- src/multi_subscriber.rs | 32 ++++++++- src/thread.rs | 147 +++++----------------------------------- 3 files changed, 56 insertions(+), 169 deletions(-) diff --git a/src/actionbar.rs b/src/actionbar.rs index ca452b64..ad7e2117 100644 --- a/src/actionbar.rs +++ b/src/actionbar.rs @@ -1,4 +1,5 @@ use crate::{ + multi_subscriber::MultiSubscriber, note::NoteRef, notecache::NoteCache, route::{Route, Router}, @@ -6,8 +7,6 @@ use crate::{ }; use enostr::{NoteId, RelayPool}; use nostrdb::{Ndb, Transaction}; -use tracing::{error, info}; -use uuid::Uuid; #[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum BarAction { @@ -69,42 +68,15 @@ fn open_thread( ThreadResult::Fresh(thread) => (thread, None), }; - // only start a subscription on nav and if we don't have - // an active subscription for this thread. - if thread.subscription().is_none() { - let filters = Thread::filters(root_id); - *thread.subscription_mut() = ndb.subscribe(&filters).ok(); - - if thread.remote_subscription().is_some() { - error!("Found active remote subscription when it was not expected"); - } else { - let subid = Uuid::new_v4().to_string(); - *thread.remote_subscription_mut() = Some(subid.clone()); - pool.subscribe(subid, filters); - } - - match thread.subscription() { - Some(_sub) => { - thread.subscribers += 1; - info!( - "Locally/remotely subscribing to thread. {} total active subscriptions, {} on this thread", - ndb.subscription_count(), - thread.subscribers, - ); - } - None => error!( - "Error subscribing locally to selected note '{}''s thread", - hex::encode(selected_note) - ), - } + let multi_subscriber = if let Some(multi_subscriber) = &mut thread.multi_subscriber { + multi_subscriber } else { - thread.subscribers += 1; - info!( - "Re-using existing thread subscription. {} total active subscriptions, {} on this thread", - ndb.subscription_count(), - thread.subscribers, - ) - } + let filters = Thread::filters(root_id); + thread.multi_subscriber = Some(MultiSubscriber::new(filters)); + thread.multi_subscriber.as_mut().unwrap() + }; + + multi_subscriber.subscribe(ndb, pool); result } diff --git a/src/multi_subscriber.rs b/src/multi_subscriber.rs index 8379e44d..301dc7c6 100644 --- a/src/multi_subscriber.rs +++ b/src/multi_subscriber.rs @@ -1,9 +1,9 @@ use enostr::{Filter, RelayPool}; -use nostrdb::Ndb; -use tracing::{error, info}; +use nostrdb::{Ndb, Note, Transaction}; +use tracing::{debug, error, info}; use uuid::Uuid; -use crate::filter::UnifiedSubscription; +use crate::{filter::UnifiedSubscription, note::NoteRef, Error}; pub struct MultiSubscriber { filters: Vec, @@ -104,4 +104,30 @@ impl MultiSubscriber { ) } } + + pub fn poll_for_notes(&mut self, ndb: &Ndb, txn: &Transaction) -> Result, Error> { + let sub = self.sub.as_ref().ok_or(Error::no_active_sub())?; + let new_note_keys = ndb.poll_for_notes(sub.local, 500); + + if new_note_keys.is_empty() { + return Ok(vec![]); + } else { + debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys); + } + + let mut notes: Vec> = Vec::with_capacity(new_note_keys.len()); + for key in new_note_keys { + let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { + note + } else { + continue; + }; + + notes.push(note); + } + + let note_refs: Vec = notes.iter().map(|n| NoteRef::from_note(n)).collect(); + + Ok(note_refs) + } } diff --git a/src/thread.rs b/src/thread.rs index baa747a4..b368075e 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -1,27 +1,19 @@ use crate::{ + multi_subscriber::MultiSubscriber, note::NoteRef, notecache::NoteCache, timeline::{TimelineTab, ViewFilter}, Error, Result, }; use enostr::RelayPool; -use nostrdb::{Filter, FilterBuilder, Ndb, Note, Subscription, Transaction}; -use std::cmp::Ordering; +use nostrdb::{Filter, FilterBuilder, Ndb, Transaction}; use std::collections::HashMap; -use tracing::{debug, error, info, warn}; +use tracing::{debug, warn}; #[derive(Default)] pub struct Thread { view: TimelineTab, - sub: Option, - remote_sub: Option, - pub subscribers: i32, -} - -#[derive(Debug, Eq, PartialEq, Copy, Clone)] -pub enum DecrementResult { - LastSubscriber(Subscription), - ActiveSubscribers, + pub multi_subscriber: Option, } impl Thread { @@ -32,15 +24,10 @@ impl Thread { } let mut view = TimelineTab::new_with_capacity(ViewFilter::NotesAndReplies, cap); view.notes = notes; - let sub: Option = None; - let remote_sub: Option = None; - let subscribers: i32 = 0; Thread { view, - sub, - remote_sub, - subscribers, + multi_subscriber: None, } } @@ -53,37 +40,18 @@ impl Thread { } #[must_use = "UnknownIds::update_from_note_refs should be used on this result"] - pub fn poll_notes_into_view<'a>( - &mut self, - txn: &'a Transaction, - ndb: &Ndb, - ) -> Result>> { - let sub = self.subscription().expect("thread subscription"); - let new_note_keys = ndb.poll_for_notes(sub, 500); - if new_note_keys.is_empty() { - return Ok(vec![]); - } else { - debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys); - } - - let mut notes: Vec> = Vec::with_capacity(new_note_keys.len()); - for key in new_note_keys { - let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { - note - } else { - continue; - }; - - notes.push(note); - } - - { + pub fn poll_notes_into_view(&mut self, txn: &Transaction, ndb: &Ndb) -> Result<()> { + if let Some(multi_subscriber) = &mut self.multi_subscriber { let reversed = true; - let note_refs: Vec = notes.iter().map(|n| NoteRef::from_note(n)).collect(); + let note_refs: Vec = multi_subscriber.poll_for_notes(ndb, txn)?; self.view.insert(¬e_refs, reversed); + } else { + return Err(Error::Generic( + "Thread unexpectedly has no MultiSubscriber".to_owned(), + )); } - Ok(notes) + Ok(()) } /// Look for new thread notes since our last fetch @@ -112,38 +80,6 @@ impl Thread { } } - pub fn decrement_sub(&mut self) -> Result { - self.subscribers -= 1; - - match self.subscribers.cmp(&0) { - Ordering::Equal => { - if let Some(sub) = self.subscription() { - Ok(DecrementResult::LastSubscriber(sub)) - } else { - Err(Error::no_active_sub()) - } - } - Ordering::Less => Err(Error::unexpected_sub_count(self.subscribers)), - Ordering::Greater => Ok(DecrementResult::ActiveSubscribers), - } - } - - pub fn subscription(&self) -> Option { - self.sub - } - - pub fn remote_subscription(&self) -> &Option { - &self.remote_sub - } - - pub fn remote_subscription_mut(&mut self) -> &mut Option { - &mut self.remote_sub - } - - pub fn subscription_mut(&mut self) -> &mut Option { - &mut self.sub - } - fn filters_raw(root: &[u8; 32]) -> Vec { vec![ nostrdb::Filter::new().kinds([1]).event(root), @@ -253,59 +189,12 @@ pub fn thread_unsubscribe( note_cache: &mut NoteCache, id: &[u8; 32], ) { - let (unsubscribe, remote_subid) = { - let txn = Transaction::new(ndb).expect("txn"); - let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, &txn, id); + let txn = Transaction::new(ndb).expect("txn"); + let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, &txn, id); - let thread = threads.thread_mut(ndb, &txn, root_id).get_ptr(); - let unsub = thread.decrement_sub(); + let thread = threads.thread_mut(ndb, &txn, root_id).get_ptr(); - let mut remote_subid: Option = None; - if let Ok(DecrementResult::LastSubscriber(_subid)) = unsub { - *thread.subscription_mut() = None; - remote_subid = thread.remote_subscription().to_owned(); - *thread.remote_subscription_mut() = None; - } - - (unsub, remote_subid) - }; - - match unsubscribe { - Ok(DecrementResult::LastSubscriber(sub)) => { - if let Err(e) = ndb.unsubscribe(sub) { - error!( - "failed to unsubscribe from thread: {e}, subid:{}, {} active subscriptions", - sub.id(), - ndb.subscription_count() - ); - } else { - info!( - "Unsubscribed from thread subid:{}. {} active subscriptions", - sub.id(), - ndb.subscription_count() - ); - } - - // unsub from remote - if let Some(subid) = remote_subid { - pool.unsubscribe(subid); - } - } - - Ok(DecrementResult::ActiveSubscribers) => { - info!( - "Keeping thread subscription. {} active subscriptions.", - ndb.subscription_count() - ); - // do nothing - } - - Err(e) => { - // something is wrong! - error!( - "Thread unsubscribe error: {e}. {} active subsciptions.", - ndb.subscription_count() - ); - } + if let Some(multi_subscriber) = &mut thread.multi_subscriber { + multi_subscriber.unsubscribe(ndb, pool); } }