threads: check for new notes locally when thread is re-opened
We have a NoteRef cache for threads in memory, which is just a list of NoteKeys and timestamps. When reopening a thread, query the local DB to see if there are any new notes that we might have missed because we weren't actively subscribed to them. Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
124
src/actionbar.rs
124
src/actionbar.rs
@@ -1,4 +1,9 @@
|
|||||||
use crate::{route::Route, thread::Thread, Damus};
|
use crate::{
|
||||||
|
note::NoteRef,
|
||||||
|
route::Route,
|
||||||
|
thread::{Thread, ThreadResult},
|
||||||
|
Damus,
|
||||||
|
};
|
||||||
use enostr::NoteId;
|
use enostr::NoteId;
|
||||||
use nostrdb::Transaction;
|
use nostrdb::Transaction;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
@@ -9,6 +14,79 @@ pub enum BarAction {
|
|||||||
OpenThread,
|
OpenThread,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum BarResult {
|
||||||
|
NewThreadNotes(Vec<NoteRef>),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// open_thread is called when a note is selected and we need to navigate
|
||||||
|
/// to a thread It is responsible for managing the subscription and
|
||||||
|
/// making sure the thread is up to date. In a sense, it's a model for
|
||||||
|
/// the thread view. We don't have a concept of model/view/controller etc
|
||||||
|
/// in egui, but this is the closest thing to that.
|
||||||
|
fn open_thread(
|
||||||
|
app: &mut Damus,
|
||||||
|
txn: &Transaction,
|
||||||
|
timeline: usize,
|
||||||
|
selected_note: &[u8; 32],
|
||||||
|
) -> Option<BarResult> {
|
||||||
|
{
|
||||||
|
let timeline = &mut app.timelines[timeline];
|
||||||
|
timeline
|
||||||
|
.routes
|
||||||
|
.push(Route::Thread(NoteId::new(selected_note.to_owned())));
|
||||||
|
timeline.navigating = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
let root_id = crate::note::root_note_id_from_selected_id(app, txn, selected_note);
|
||||||
|
let thread_res = app.threads.thread_mut(&app.ndb, txn, root_id);
|
||||||
|
|
||||||
|
// The thread is stale, let's update it
|
||||||
|
let (thread, result) = match thread_res {
|
||||||
|
ThreadResult::Stale(thread) => {
|
||||||
|
let notes = Thread::new_notes(&thread.view.notes, root_id, txn, &app.ndb);
|
||||||
|
//
|
||||||
|
// we can't insert and update the VirtualList now, because we
|
||||||
|
// are already borrowing it mutably. Let's pass it as a
|
||||||
|
// result instead
|
||||||
|
//
|
||||||
|
// thread.view.insert(¬es);
|
||||||
|
(thread, Some(BarResult::NewThreadNotes(notes)))
|
||||||
|
}
|
||||||
|
|
||||||
|
ThreadResult::Fresh(thread) => (thread, None),
|
||||||
|
};
|
||||||
|
|
||||||
|
// only start a subscription on nav and if we don't have
|
||||||
|
// an active subscription for this thread.
|
||||||
|
if thread.subscription().is_none() {
|
||||||
|
*thread.subscription_mut() = app.ndb.subscribe(Thread::filters(root_id)).ok();
|
||||||
|
|
||||||
|
match thread.subscription() {
|
||||||
|
Some(_sub) => {
|
||||||
|
thread.subscribers += 1;
|
||||||
|
info!(
|
||||||
|
"Locally subscribing to thread. {} total active subscriptions, {} on this thread",
|
||||||
|
app.ndb.subscription_count(),
|
||||||
|
thread.subscribers,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
None => warn!(
|
||||||
|
"Error subscribing locally to selected note '{}''s thread",
|
||||||
|
hex::encode(selected_note)
|
||||||
|
),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
thread.subscribers += 1;
|
||||||
|
info!(
|
||||||
|
"Re-using existing thread subscription. {} total active subscriptions, {} on this thread",
|
||||||
|
app.ndb.subscription_count(),
|
||||||
|
thread.subscribers,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
impl BarAction {
|
impl BarAction {
|
||||||
pub fn execute(
|
pub fn execute(
|
||||||
self,
|
self,
|
||||||
@@ -16,7 +94,7 @@ impl BarAction {
|
|||||||
timeline: usize,
|
timeline: usize,
|
||||||
replying_to: &[u8; 32],
|
replying_to: &[u8; 32],
|
||||||
txn: &Transaction,
|
txn: &Transaction,
|
||||||
) {
|
) -> Option<BarResult> {
|
||||||
match self {
|
match self {
|
||||||
BarAction::Reply => {
|
BarAction::Reply => {
|
||||||
let timeline = &mut app.timelines[timeline];
|
let timeline = &mut app.timelines[timeline];
|
||||||
@@ -24,48 +102,10 @@ impl BarAction {
|
|||||||
.routes
|
.routes
|
||||||
.push(Route::Reply(NoteId::new(replying_to.to_owned())));
|
.push(Route::Reply(NoteId::new(replying_to.to_owned())));
|
||||||
timeline.navigating = true;
|
timeline.navigating = true;
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
BarAction::OpenThread => {
|
BarAction::OpenThread => open_thread(app, txn, timeline, replying_to),
|
||||||
{
|
|
||||||
let timeline = &mut app.timelines[timeline];
|
|
||||||
timeline
|
|
||||||
.routes
|
|
||||||
.push(Route::Thread(NoteId::new(replying_to.to_owned())));
|
|
||||||
timeline.navigating = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
let root_id = crate::note::root_note_id_from_selected_id(app, txn, replying_to);
|
|
||||||
let thread = app.threads.thread_mut(&app.ndb, txn, root_id);
|
|
||||||
|
|
||||||
// only start a subscription on nav and if we don't have
|
|
||||||
// an active subscription for this thread.
|
|
||||||
if thread.subscription().is_none() {
|
|
||||||
*thread.subscription_mut() = app.ndb.subscribe(Thread::filters(root_id)).ok();
|
|
||||||
|
|
||||||
match thread.subscription() {
|
|
||||||
Some(_sub) => {
|
|
||||||
thread.subscribers += 1;
|
|
||||||
info!(
|
|
||||||
"Locally subscribing to thread. {} total active subscriptions, {} on this thread",
|
|
||||||
app.ndb.subscription_count(),
|
|
||||||
thread.subscribers,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
None => warn!(
|
|
||||||
"Error subscribing locally to selected note '{}''s thread",
|
|
||||||
hex::encode(replying_to)
|
|
||||||
),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
thread.subscribers += 1;
|
|
||||||
info!(
|
|
||||||
"Re-using existing thread subscription. {} total active subscriptions, {} on this thread",
|
|
||||||
app.ndb.subscription_count(),
|
|
||||||
thread.subscribers,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
11
src/app.rs
11
src/app.rs
@@ -887,9 +887,14 @@ fn thread_unsubscribe(app: &mut Damus, id: &[u8; 32]) {
|
|||||||
|
|
||||||
debug!("thread unsubbing from root_id {}", hex::encode(root_id));
|
debug!("thread unsubbing from root_id {}", hex::encode(root_id));
|
||||||
|
|
||||||
app.threads
|
let thread = app.threads.thread_mut(&app.ndb, &txn, root_id).get_ptr();
|
||||||
.thread_mut(&app.ndb, &txn, root_id)
|
let unsub = thread.decrement_sub();
|
||||||
.decrement_sub()
|
|
||||||
|
if let Ok(DecrementResult::LastSubscriber(_subid)) = unsub {
|
||||||
|
*thread.subscription_mut() = None;
|
||||||
|
}
|
||||||
|
|
||||||
|
unsub
|
||||||
};
|
};
|
||||||
|
|
||||||
match unsubscribe {
|
match unsubscribe {
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use crate::note::NoteRef;
|
|||||||
use crate::timeline::{TimelineTab, ViewFilter};
|
use crate::timeline::{TimelineTab, ViewFilter};
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
use nostrdb::{Filter, Ndb, Subscription, Transaction};
|
use nostrdb::{Filter, Ndb, Subscription, Transaction};
|
||||||
|
use std::cmp::Ordering;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
@@ -51,15 +52,13 @@ impl Thread {
|
|||||||
let filters = Thread::filters_since(root_id, last_note.created_at - 60);
|
let filters = Thread::filters_since(root_id, last_note.created_at - 60);
|
||||||
|
|
||||||
if let Ok(results) = ndb.query(txn, filters, 1000) {
|
if let Ok(results) = ndb.query(txn, filters, 1000) {
|
||||||
|
debug!("got {} results from thread update", results.len());
|
||||||
results
|
results
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(NoteRef::from_query_result)
|
.map(NoteRef::from_query_result)
|
||||||
.collect()
|
.collect()
|
||||||
} else {
|
} else {
|
||||||
debug!(
|
debug!("got no results from thread update",);
|
||||||
"got no results from thread update for {}",
|
|
||||||
hex::encode(root_id)
|
|
||||||
);
|
|
||||||
vec![]
|
vec![]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -67,17 +66,17 @@ impl Thread {
|
|||||||
pub fn decrement_sub(&mut self) -> Result<DecrementResult, Error> {
|
pub fn decrement_sub(&mut self) -> Result<DecrementResult, Error> {
|
||||||
debug!("decrementing sub {:?}", self.subscription().map(|s| s.id));
|
debug!("decrementing sub {:?}", self.subscription().map(|s| s.id));
|
||||||
self.subscribers -= 1;
|
self.subscribers -= 1;
|
||||||
if self.subscribers == 0 {
|
|
||||||
// unsub from thread
|
match self.subscribers.cmp(&0) {
|
||||||
if let Some(sub) = self.subscription() {
|
Ordering::Equal => {
|
||||||
Ok(DecrementResult::LastSubscriber(sub.id))
|
if let Some(sub) = self.subscription() {
|
||||||
} else {
|
Ok(DecrementResult::LastSubscriber(sub.id))
|
||||||
Err(Error::no_active_sub())
|
} else {
|
||||||
|
Err(Error::no_active_sub())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if self.subscribers < 0 {
|
Ordering::Less => Err(Error::unexpected_sub_count(self.subscribers)),
|
||||||
Err(Error::unexpected_sub_count(self.subscribers))
|
Ordering::Greater => Ok(DecrementResult::ActiveSubscribers),
|
||||||
} else {
|
|
||||||
Ok(DecrementResult::ActiveSubscribers)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,6 +120,27 @@ pub struct Threads {
|
|||||||
pub root_id_to_thread: HashMap<[u8; 32], Thread>,
|
pub root_id_to_thread: HashMap<[u8; 32], Thread>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum ThreadResult<'a> {
|
||||||
|
Fresh(&'a mut Thread),
|
||||||
|
Stale(&'a mut Thread),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ThreadResult<'a> {
|
||||||
|
pub fn get_ptr(self) -> &'a mut Thread {
|
||||||
|
match self {
|
||||||
|
Self::Fresh(ptr) => ptr,
|
||||||
|
Self::Stale(ptr) => ptr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_stale(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
Self::Fresh(_ptr) => false,
|
||||||
|
Self::Stale(_ptr) => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Threads {
|
impl Threads {
|
||||||
pub fn thread_expected_mut(&mut self, root_id: &[u8; 32]) -> &mut Thread {
|
pub fn thread_expected_mut(&mut self, root_id: &[u8; 32]) -> &mut Thread {
|
||||||
self.root_id_to_thread
|
self.root_id_to_thread
|
||||||
@@ -129,17 +149,17 @@ impl Threads {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn thread_mut<'a>(
|
pub fn thread_mut<'a>(
|
||||||
&mut self,
|
&'a mut self,
|
||||||
ndb: &Ndb,
|
ndb: &Ndb,
|
||||||
txn: &Transaction,
|
txn: &Transaction,
|
||||||
root_id: &[u8; 32],
|
root_id: &[u8; 32],
|
||||||
) -> &mut Thread {
|
) -> ThreadResult<'a> {
|
||||||
// we can't use the naive hashmap entry API here because lookups
|
// we can't use the naive hashmap entry API here because lookups
|
||||||
// require a copy, wait until we have a raw entry api. We could
|
// require a copy, wait until we have a raw entry api. We could
|
||||||
// also use hashbrown?
|
// also use hashbrown?
|
||||||
|
|
||||||
if self.root_id_to_thread.contains_key(root_id) {
|
if self.root_id_to_thread.contains_key(root_id) {
|
||||||
return self.root_id_to_thread.get_mut(root_id).unwrap();
|
return ThreadResult::Stale(self.root_id_to_thread.get_mut(root_id).unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
// looks like we don't have this thread yet, populate it
|
// looks like we don't have this thread yet, populate it
|
||||||
@@ -150,7 +170,7 @@ impl Threads {
|
|||||||
debug!("couldnt find root note root_id:{}", hex::encode(root_id));
|
debug!("couldnt find root note root_id:{}", hex::encode(root_id));
|
||||||
self.root_id_to_thread
|
self.root_id_to_thread
|
||||||
.insert(root_id.to_owned(), Thread::new(vec![]));
|
.insert(root_id.to_owned(), Thread::new(vec![]));
|
||||||
return self.root_id_to_thread.get_mut(root_id).unwrap();
|
return ThreadResult::Fresh(self.root_id_to_thread.get_mut(root_id).unwrap());
|
||||||
};
|
};
|
||||||
|
|
||||||
// we don't have the thread, query for it!
|
// we don't have the thread, query for it!
|
||||||
@@ -172,7 +192,7 @@ impl Threads {
|
|||||||
debug!("found thread with {} notes", notes.len());
|
debug!("found thread with {} notes", notes.len());
|
||||||
self.root_id_to_thread
|
self.root_id_to_thread
|
||||||
.insert(root_id.to_owned(), Thread::new(notes));
|
.insert(root_id.to_owned(), Thread::new(notes));
|
||||||
self.root_id_to_thread.get_mut(root_id).unwrap()
|
ThreadResult::Fresh(self.root_id_to_thread.get_mut(root_id).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
//fn thread_by_id(&self, ndb: &Ndb, id: &[u8; 32]) -> &mut Thread {
|
//fn thread_by_id(&self, ndb: &Ndb, id: &[u8; 32]) -> &mut Thread {
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ impl<'a> TimelineSource<'a> {
|
|||||||
let thread = if app.threads.root_id_to_thread.contains_key(root_id) {
|
let thread = if app.threads.root_id_to_thread.contains_key(root_id) {
|
||||||
app.threads.thread_expected_mut(root_id)
|
app.threads.thread_expected_mut(root_id)
|
||||||
} else {
|
} else {
|
||||||
app.threads.thread_mut(&app.ndb, txn, root_id)
|
app.threads.thread_mut(&app.ndb, txn, root_id).get_ptr()
|
||||||
};
|
};
|
||||||
|
|
||||||
&mut thread.view
|
&mut thread.view
|
||||||
@@ -57,7 +57,7 @@ impl<'a> TimelineSource<'a> {
|
|||||||
let thread = if app.threads.root_id_to_thread.contains_key(root_id) {
|
let thread = if app.threads.root_id_to_thread.contains_key(root_id) {
|
||||||
app.threads.thread_expected_mut(root_id)
|
app.threads.thread_expected_mut(root_id)
|
||||||
} else {
|
} else {
|
||||||
app.threads.thread_mut(&app.ndb, txn, root_id)
|
app.threads.thread_mut(&app.ndb, txn, root_id).get_ptr()
|
||||||
};
|
};
|
||||||
|
|
||||||
thread.subscription()
|
thread.subscription()
|
||||||
@@ -213,6 +213,9 @@ impl TimelineTab {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert(&mut self, new_refs: &[NoteRef]) {
|
pub fn insert(&mut self, new_refs: &[NoteRef]) {
|
||||||
|
if new_refs.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
let num_prev_items = self.notes.len();
|
let num_prev_items = self.notes.len();
|
||||||
let (notes, merge_kind) = crate::timeline::merge_sorted_vecs(&self.notes, new_refs);
|
let (notes, merge_kind) = crate::timeline::merge_sorted_vecs(&self.notes, new_refs);
|
||||||
|
|
||||||
|
|||||||
@@ -79,7 +79,11 @@ impl<'a> ThreadView<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let (len, list) = {
|
let (len, list) = {
|
||||||
let thread = self.app.threads.thread_mut(&self.app.ndb, &txn, root_id);
|
let thread = self
|
||||||
|
.app
|
||||||
|
.threads
|
||||||
|
.thread_mut(&self.app.ndb, &txn, root_id)
|
||||||
|
.get_ptr();
|
||||||
|
|
||||||
let len = thread.view.notes.len();
|
let len = thread.view.notes.len();
|
||||||
(len, &mut thread.view.list)
|
(len, &mut thread.view.list)
|
||||||
@@ -92,7 +96,11 @@ impl<'a> ThreadView<'a> {
|
|||||||
ui.spacing_mut().item_spacing.x = 4.0;
|
ui.spacing_mut().item_spacing.x = 4.0;
|
||||||
|
|
||||||
let note_key = {
|
let note_key = {
|
||||||
let thread = self.app.threads.thread_mut(&self.app.ndb, &txn, root_id);
|
let thread = self
|
||||||
|
.app
|
||||||
|
.threads
|
||||||
|
.thread_mut(&self.app.ndb, &txn, root_id)
|
||||||
|
.get_ptr();
|
||||||
thread.view.notes[start_index].key
|
thread.view.notes[start_index].key
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use crate::{draft::DraftSource, ui, ui::note::PostAction, Damus};
|
use crate::{actionbar::BarResult, draft::DraftSource, ui, ui::note::PostAction, Damus};
|
||||||
use egui::containers::scroll_area::ScrollBarVisibility;
|
use egui::containers::scroll_area::ScrollBarVisibility;
|
||||||
use egui::{Direction, Layout};
|
use egui::{Direction, Layout};
|
||||||
use egui_tabs::TabColor;
|
use egui_tabs::TabColor;
|
||||||
@@ -56,6 +56,7 @@ fn timeline_ui(ui: &mut egui::Ui, app: &mut Damus, timeline: usize, reversed: bo
|
|||||||
.show(ui, |ui| {
|
.show(ui, |ui| {
|
||||||
let view = app.timelines[timeline].current_view();
|
let view = app.timelines[timeline].current_view();
|
||||||
let len = view.notes.len();
|
let len = view.notes.len();
|
||||||
|
let mut bar_result: Option<BarResult> = None;
|
||||||
view.list
|
view.list
|
||||||
.clone()
|
.clone()
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
@@ -92,7 +93,10 @@ fn timeline_ui(ui: &mut egui::Ui, app: &mut Damus, timeline: usize, reversed: bo
|
|||||||
.show(ui);
|
.show(ui);
|
||||||
|
|
||||||
if let Some(action) = resp.action {
|
if let Some(action) = resp.action {
|
||||||
action.execute(app, timeline, note.id(), &txn);
|
let br = action.execute(app, timeline, note.id(), &txn);
|
||||||
|
if br.is_some() {
|
||||||
|
bar_result = br;
|
||||||
|
}
|
||||||
} else if resp.response.clicked() {
|
} else if resp.response.clicked() {
|
||||||
debug!("clicked note");
|
debug!("clicked note");
|
||||||
}
|
}
|
||||||
@@ -103,6 +107,16 @@ fn timeline_ui(ui: &mut egui::Ui, app: &mut Damus, timeline: usize, reversed: bo
|
|||||||
|
|
||||||
1
|
1
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if let Some(br) = bar_result {
|
||||||
|
match br {
|
||||||
|
// update the thread for next render if we have new notes
|
||||||
|
BarResult::NewThreadNotes(notes) => {
|
||||||
|
let view = app.timelines[timeline].current_view_mut();
|
||||||
|
view.insert(¬es);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user