local thread subscriptions
This adds local nostrdb thread subscriptions. When navigating to a thread, we first check to see if we have any active nostrdb subscriptions for that thread. If not, we create a new subscription. If we do, we re-use that subscription. This works by storing thread state in the Threads struct in the Damus application state. When we pop a route, we check to see if its a thread route. If it is, then we try to unsubscribe, but only if that is the last remaining subscriber for that thread, as there could be more than one. Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
173
src/app.rs
173
src/app.rs
@@ -2,7 +2,6 @@ use crate::account_manager::AccountManager;
|
||||
use crate::app_creation::setup_cc;
|
||||
use crate::app_style::user_requested_visuals_change;
|
||||
use crate::draft::Drafts;
|
||||
use crate::error::Error;
|
||||
use crate::frame_history::FrameHistory;
|
||||
use crate::imgcache::ImageCache;
|
||||
use crate::key_storage::KeyStorageType;
|
||||
@@ -10,9 +9,9 @@ use crate::note::NoteRef;
|
||||
use crate::notecache::{CachedNote, NoteCache};
|
||||
use crate::relay_pool_manager::RelayPoolManager;
|
||||
use crate::route::Route;
|
||||
use crate::thread::Threads;
|
||||
use crate::thread::{DecrementResult, Threads};
|
||||
use crate::timeline;
|
||||
use crate::timeline::{MergeKind, Timeline, ViewFilter};
|
||||
use crate::timeline::{Timeline, TimelineSource, ViewFilter};
|
||||
use crate::ui::note::PostAction;
|
||||
use crate::ui::{self, AccountSelectionWidget, DesktopGlobalPopup};
|
||||
use crate::ui::{DesktopSidePanel, RelayView, View};
|
||||
@@ -231,7 +230,8 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
|
||||
let txn = Transaction::new(&damus.ndb)?;
|
||||
let mut unknown_ids: HashSet<UnknownId> = HashSet::new();
|
||||
for timeline in 0..damus.timelines.len() {
|
||||
if let Err(err) = poll_notes_for_timeline(damus, &txn, timeline, &mut unknown_ids) {
|
||||
let src = TimelineSource::column(timeline);
|
||||
if let Err(err) = src.poll_notes_into_view(damus, &txn, &mut unknown_ids) {
|
||||
error!("{}", err);
|
||||
}
|
||||
}
|
||||
@@ -250,7 +250,7 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
|
||||
}
|
||||
|
||||
#[derive(Hash, Clone, Copy, PartialEq, Eq)]
|
||||
enum UnknownId<'a> {
|
||||
pub enum UnknownId<'a> {
|
||||
Pubkey(&'a [u8; 32]),
|
||||
Id(&'a [u8; 32]),
|
||||
}
|
||||
@@ -271,7 +271,7 @@ impl<'a> UnknownId<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_unknown_note_ids<'a>(
|
||||
pub fn get_unknown_note_ids<'a>(
|
||||
ndb: &Ndb,
|
||||
_cached_note: &CachedNote,
|
||||
txn: &'a Transaction,
|
||||
@@ -354,103 +354,6 @@ fn get_unknown_note_ids<'a>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_notes_for_timeline<'a>(
|
||||
damus: &mut Damus,
|
||||
txn: &'a Transaction,
|
||||
timeline_ind: usize,
|
||||
ids: &mut HashSet<UnknownId<'a>>,
|
||||
) -> Result<()> {
|
||||
let sub = if let Some(sub) = &damus.timelines[timeline_ind].subscription {
|
||||
sub
|
||||
} else {
|
||||
return Err(Error::NoActiveSubscription);
|
||||
};
|
||||
|
||||
let new_note_ids = damus.ndb.poll_for_notes(sub, 100);
|
||||
if new_note_ids.is_empty() {
|
||||
return Ok(());
|
||||
} else {
|
||||
debug!("{} new notes! {:?}", new_note_ids.len(), new_note_ids);
|
||||
}
|
||||
|
||||
let new_refs: Vec<(Note, NoteRef)> = new_note_ids
|
||||
.iter()
|
||||
.map(|key| {
|
||||
let note = damus.ndb.get_note_by_key(txn, *key).expect("no note??");
|
||||
let cached_note = damus
|
||||
.note_cache_mut()
|
||||
.cached_note_or_insert(*key, ¬e)
|
||||
.clone();
|
||||
let _ = get_unknown_note_ids(&damus.ndb, &cached_note, txn, ¬e, *key, ids);
|
||||
|
||||
let created_at = note.created_at();
|
||||
(
|
||||
note,
|
||||
NoteRef {
|
||||
key: *key,
|
||||
created_at,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
// ViewFilter::NotesAndReplies
|
||||
{
|
||||
let refs: Vec<NoteRef> = new_refs.iter().map(|(_note, nr)| *nr).collect();
|
||||
|
||||
insert_notes_into_timeline(damus, timeline_ind, ViewFilter::NotesAndReplies, &refs)
|
||||
}
|
||||
|
||||
//
|
||||
// handle the filtered case (ViewFilter::Notes, no replies)
|
||||
//
|
||||
// TODO(jb55): this is mostly just copied from above, let's just use a loop
|
||||
// I initially tried this but ran into borrow checker issues
|
||||
{
|
||||
let mut filtered_refs = Vec::with_capacity(new_refs.len());
|
||||
for (note, nr) in &new_refs {
|
||||
let cached_note = damus.note_cache_mut().cached_note_or_insert(nr.key, note);
|
||||
|
||||
if ViewFilter::filter_notes(cached_note, note) {
|
||||
filtered_refs.push(*nr);
|
||||
}
|
||||
}
|
||||
|
||||
insert_notes_into_timeline(damus, timeline_ind, ViewFilter::Notes, &filtered_refs);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn insert_notes_into_timeline(
|
||||
app: &mut Damus,
|
||||
timeline_ind: usize,
|
||||
filter: ViewFilter,
|
||||
new_refs: &[NoteRef],
|
||||
) {
|
||||
let timeline = &mut app.timelines[timeline_ind];
|
||||
let num_prev_items = timeline.notes(filter).len();
|
||||
let (notes, merge_kind) = timeline::merge_sorted_vecs(timeline.notes(filter), new_refs);
|
||||
debug!(
|
||||
"got merge kind {:?} for {:?} on timeline {}",
|
||||
merge_kind, filter, timeline_ind
|
||||
);
|
||||
|
||||
timeline.view_mut(filter).notes = notes;
|
||||
let new_items = timeline.notes(filter).len() - num_prev_items;
|
||||
|
||||
// TODO: technically items could have been added inbetween
|
||||
if new_items > 0 {
|
||||
let mut list = app.timelines[timeline_ind].view(filter).list.borrow_mut();
|
||||
|
||||
match merge_kind {
|
||||
// TODO: update egui_virtual_list to support spliced inserts
|
||||
MergeKind::Spliced => list.reset(),
|
||||
MergeKind::FrontInsert => list.items_inserted_at_start(new_items),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "profiling")]
|
||||
fn setup_profiling() {
|
||||
puffin::set_scopes_on(true); // tell puffin to collect data
|
||||
@@ -787,7 +690,7 @@ impl Damus {
|
||||
// TODO: should pull this from settings
|
||||
None,
|
||||
// TODO: use correct KeyStorage mechanism for current OS arch
|
||||
determine_key_storage_type(),
|
||||
KeyStorageType::None,
|
||||
);
|
||||
|
||||
for key in parsed_args.keys {
|
||||
@@ -996,6 +899,50 @@ fn render_panel(ctx: &egui::Context, app: &mut Damus, timeline_ind: usize) {
|
||||
});
|
||||
}
|
||||
|
||||
/// Local thread unsubscribe
|
||||
fn thread_unsubscribe(app: &mut Damus, id: &[u8; 32]) {
|
||||
let unsubscribe = {
|
||||
let txn = Transaction::new(&app.ndb).expect("txn");
|
||||
let root_id = crate::note::root_note_id_from_selected_id(app, &txn, id);
|
||||
|
||||
debug!("thread unsubbing from root_id {}", hex::encode(root_id));
|
||||
|
||||
app.threads
|
||||
.thread_mut(&app.ndb, &txn, root_id)
|
||||
.decrement_sub()
|
||||
};
|
||||
|
||||
match unsubscribe {
|
||||
Ok(DecrementResult::LastSubscriber(sub_id)) => {
|
||||
if let Err(e) = app.ndb.unsubscribe(sub_id) {
|
||||
error!("failed to unsubscribe from thread: {e}, subid:{sub_id}, {} active subscriptions", app.ndb.subscription_count());
|
||||
} else {
|
||||
info!(
|
||||
"Unsubscribed from thread subid:{}. {} active subscriptions",
|
||||
sub_id,
|
||||
app.ndb.subscription_count()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DecrementResult::ActiveSubscribers) => {
|
||||
info!(
|
||||
"Keeping thread subscription. {} active subscriptions.",
|
||||
app.ndb.subscription_count()
|
||||
);
|
||||
// do nothing
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
// something is wrong!
|
||||
error!(
|
||||
"Thread unsubscribe error: {e}. {} active subsciptions.",
|
||||
app.ndb.subscription_count()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn render_nav(routes: Vec<Route>, timeline_ind: usize, app: &mut Damus, ui: &mut egui::Ui) {
|
||||
let navigating = app.timelines[timeline_ind].navigating;
|
||||
let returning = app.timelines[timeline_ind].returning;
|
||||
@@ -1026,12 +973,7 @@ fn render_nav(routes: Vec<Route>, timeline_ind: usize, app: &mut Damus, ui: &mut
|
||||
|
||||
Route::Thread(id) => {
|
||||
let app = &mut app_ctx.borrow_mut();
|
||||
if let Ok(txn) = Transaction::new(&app.ndb) {
|
||||
if let Ok(note) = app.ndb.get_note_by_id(&txn, id.bytes()) {
|
||||
ui::ThreadView::new(app, timeline_ind, ¬e).ui(ui);
|
||||
}
|
||||
}
|
||||
|
||||
ui::ThreadView::new(app, timeline_ind, id.bytes()).ui(ui);
|
||||
None
|
||||
}
|
||||
|
||||
@@ -1063,18 +1005,21 @@ fn render_nav(routes: Vec<Route>, timeline_ind: usize, app: &mut Damus, ui: &mut
|
||||
}
|
||||
});
|
||||
|
||||
let mut app = app_ctx.borrow_mut();
|
||||
if let Some(reply_response) = nav_response.inner {
|
||||
if let Some(PostAction::Post(_np)) = reply_response.inner.action {
|
||||
app_ctx.borrow_mut().timelines[timeline_ind].returning = true;
|
||||
app.timelines[timeline_ind].returning = true;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(NavAction::Returned) = nav_response.action {
|
||||
let mut app = app_ctx.borrow_mut();
|
||||
app.timelines[timeline_ind].routes.pop();
|
||||
let popped = app.timelines[timeline_ind].routes.pop();
|
||||
if let Some(Route::Thread(id)) = popped {
|
||||
thread_unsubscribe(&mut app, id.bytes());
|
||||
}
|
||||
app.timelines[timeline_ind].returning = false;
|
||||
} else if let Some(NavAction::Navigated) = nav_response.action {
|
||||
app_ctx.borrow_mut().timelines[timeline_ind].navigating = false;
|
||||
app.timelines[timeline_ind].navigating = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user