162
src/thread.rs
162
src/thread.rs
@@ -1,14 +1,10 @@
|
||||
use crate::{
|
||||
multi_subscriber::MultiSubscriber,
|
||||
note::NoteRef,
|
||||
notecache::NoteCache,
|
||||
notes_holder::NotesHolder,
|
||||
timeline::{TimelineTab, ViewFilter},
|
||||
Error, Result,
|
||||
};
|
||||
use enostr::RelayPool;
|
||||
use nostrdb::{Filter, FilterBuilder, Ndb, Transaction};
|
||||
use std::collections::HashMap;
|
||||
use tracing::{debug, warn};
|
||||
use nostrdb::{Filter, FilterBuilder};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Thread {
|
||||
@@ -39,47 +35,6 @@ impl Thread {
|
||||
&mut self.view
|
||||
}
|
||||
|
||||
#[must_use = "UnknownIds::update_from_note_refs should be used on this result"]
|
||||
pub fn poll_notes_into_view(&mut self, txn: &Transaction, ndb: &Ndb) -> Result<()> {
|
||||
if let Some(multi_subscriber) = &mut self.multi_subscriber {
|
||||
let reversed = true;
|
||||
let note_refs: Vec<NoteRef> = multi_subscriber.poll_for_notes(ndb, txn)?;
|
||||
self.view.insert(¬e_refs, reversed);
|
||||
} else {
|
||||
return Err(Error::Generic(
|
||||
"Thread unexpectedly has no MultiSubscriber".to_owned(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Look for new thread notes since our last fetch
|
||||
pub fn new_notes(
|
||||
notes: &[NoteRef],
|
||||
root_id: &[u8; 32],
|
||||
txn: &Transaction,
|
||||
ndb: &Ndb,
|
||||
) -> Vec<NoteRef> {
|
||||
if notes.is_empty() {
|
||||
return vec![];
|
||||
}
|
||||
|
||||
let last_note = notes[0];
|
||||
let filters = Thread::filters_since(root_id, last_note.created_at + 1);
|
||||
|
||||
if let Ok(results) = ndb.query(txn, &filters, 1000) {
|
||||
debug!("got {} results from thread update", results.len());
|
||||
results
|
||||
.into_iter()
|
||||
.map(NoteRef::from_query_result)
|
||||
.collect()
|
||||
} else {
|
||||
debug!("got no results from thread update",);
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
||||
fn filters_raw(root: &[u8; 32]) -> Vec<FilterBuilder> {
|
||||
vec![
|
||||
nostrdb::Filter::new().kinds([1]).event(root),
|
||||
@@ -102,99 +57,28 @@ impl Thread {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Threads {
|
||||
/// root id to thread
|
||||
pub root_id_to_thread: HashMap<[u8; 32], Thread>,
|
||||
}
|
||||
|
||||
pub enum ThreadResult<'a> {
|
||||
Fresh(&'a mut Thread),
|
||||
Stale(&'a mut Thread),
|
||||
}
|
||||
|
||||
impl<'a> ThreadResult<'a> {
|
||||
pub fn get_ptr(self) -> &'a mut Thread {
|
||||
match self {
|
||||
Self::Fresh(ptr) => ptr,
|
||||
Self::Stale(ptr) => ptr,
|
||||
}
|
||||
impl NotesHolder for Thread {
|
||||
fn get_multi_subscriber(&mut self) -> Option<&mut MultiSubscriber> {
|
||||
self.multi_subscriber.as_mut()
|
||||
}
|
||||
|
||||
pub fn is_stale(&self) -> bool {
|
||||
match self {
|
||||
Self::Fresh(_ptr) => false,
|
||||
Self::Stale(_ptr) => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Threads {
|
||||
pub fn thread_expected_mut(&mut self, root_id: &[u8; 32]) -> &mut Thread {
|
||||
self.root_id_to_thread
|
||||
.get_mut(root_id)
|
||||
.expect("thread_expected_mut used but there was no thread")
|
||||
}
|
||||
|
||||
pub fn thread_mut<'a>(
|
||||
&'a mut self,
|
||||
ndb: &Ndb,
|
||||
txn: &Transaction,
|
||||
root_id: &[u8; 32],
|
||||
) -> ThreadResult<'a> {
|
||||
// we can't use the naive hashmap entry API here because lookups
|
||||
// require a copy, wait until we have a raw entry api. We could
|
||||
// also use hashbrown?
|
||||
|
||||
if self.root_id_to_thread.contains_key(root_id) {
|
||||
return ThreadResult::Stale(self.thread_expected_mut(root_id));
|
||||
}
|
||||
|
||||
// we don't have the thread, query for it!
|
||||
let filters = Thread::filters(root_id);
|
||||
|
||||
let notes = if let Ok(results) = ndb.query(txn, &filters, 1000) {
|
||||
results
|
||||
.into_iter()
|
||||
.map(NoteRef::from_query_result)
|
||||
.collect()
|
||||
} else {
|
||||
debug!(
|
||||
"got no results from thread lookup for {}",
|
||||
hex::encode(root_id)
|
||||
);
|
||||
vec![]
|
||||
};
|
||||
|
||||
if notes.is_empty() {
|
||||
warn!("thread query returned 0 notes? ")
|
||||
} else {
|
||||
debug!("found thread with {} notes", notes.len());
|
||||
}
|
||||
|
||||
self.root_id_to_thread
|
||||
.insert(root_id.to_owned(), Thread::new(notes));
|
||||
ThreadResult::Fresh(self.root_id_to_thread.get_mut(root_id).unwrap())
|
||||
}
|
||||
|
||||
//fn thread_by_id(&self, ndb: &Ndb, id: &[u8; 32]) -> &mut Thread {
|
||||
//}
|
||||
}
|
||||
|
||||
/// Local thread unsubscribe
|
||||
pub fn thread_unsubscribe(
|
||||
ndb: &Ndb,
|
||||
threads: &mut Threads,
|
||||
pool: &mut RelayPool,
|
||||
note_cache: &mut NoteCache,
|
||||
id: &[u8; 32],
|
||||
) {
|
||||
let txn = Transaction::new(ndb).expect("txn");
|
||||
let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, &txn, id);
|
||||
|
||||
let thread = threads.thread_mut(ndb, &txn, root_id).get_ptr();
|
||||
|
||||
if let Some(multi_subscriber) = &mut thread.multi_subscriber {
|
||||
multi_subscriber.unsubscribe(ndb, pool);
|
||||
fn filters(for_id: &[u8; 32]) -> Vec<Filter> {
|
||||
Thread::filters(for_id)
|
||||
}
|
||||
|
||||
fn new_notes_holder(notes: Vec<NoteRef>) -> Self {
|
||||
Thread::new(notes)
|
||||
}
|
||||
|
||||
fn get_view(&mut self) -> &mut TimelineTab {
|
||||
&mut self.view
|
||||
}
|
||||
|
||||
fn filters_since(for_id: &[u8; 32], since: u64) -> Vec<Filter> {
|
||||
Thread::filters_since(for_id, since)
|
||||
}
|
||||
|
||||
fn set_multi_subscriber(&mut self, subscriber: MultiSubscriber) {
|
||||
self.multi_subscriber = Some(subscriber);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user