Flexible routing

Another massive refactor to change the way routing works. Now any
column can route anywhere.

Also things are generally just much better and more modular via the
new struct split borrowing technique.

I didn't even try to split this into smaller commits for my sanity.

Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2024-09-11 19:43:41 -07:00
parent b4a8cddc48
commit 36c0971fd9
27 changed files with 973 additions and 963 deletions

View File

@@ -1,14 +1,18 @@
use crate::note::NoteRef;
use crate::timeline::{TimelineTab, ViewFilter};
use crate::Error;
use nostrdb::{Filter, FilterBuilder, Ndb, Subscription, Transaction};
use crate::{
note::NoteRef,
notecache::NoteCache,
timeline::{TimelineTab, ViewFilter},
Error, Result,
};
use enostr::RelayPool;
use nostrdb::{Filter, FilterBuilder, Ndb, Note, Subscription, Transaction};
use std::cmp::Ordering;
use std::collections::HashMap;
use tracing::{debug, warn};
use tracing::{debug, error, info, warn};
#[derive(Default)]
pub struct Thread {
pub view: TimelineTab,
view: TimelineTab,
sub: Option<Subscription>,
remote_sub: Option<String>,
pub subscribers: i32,
@@ -40,6 +44,48 @@ impl Thread {
}
}
pub fn view(&self) -> &TimelineTab {
&self.view
}
pub fn view_mut(&mut self) -> &mut TimelineTab {
&mut self.view
}
#[must_use = "UnknownIds::update_from_note_refs should be used on this result"]
pub fn poll_notes_into_view<'a>(
&mut self,
txn: &'a Transaction,
ndb: &Ndb,
) -> Result<Vec<Note<'a>>> {
let sub = self.subscription().expect("thread subscription");
let new_note_keys = ndb.poll_for_notes(sub, 500);
if new_note_keys.is_empty() {
return Ok(vec![]);
} else {
debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys);
}
let mut notes: Vec<Note<'a>> = Vec::with_capacity(new_note_keys.len());
for key in new_note_keys {
let note = if let Ok(note) = ndb.get_note_by_key(txn, key) {
note
} else {
continue;
};
notes.push(note);
}
{
let reversed = true;
let note_refs: Vec<NoteRef> = notes.iter().map(|n| NoteRef::from_note(n)).collect();
self.view.insert(&note_refs, reversed);
}
Ok(notes)
}
/// Look for new thread notes since our last fetch
pub fn new_notes(
notes: &[NoteRef],
@@ -66,7 +112,7 @@ impl Thread {
}
}
pub fn decrement_sub(&mut self) -> Result<DecrementResult, Error> {
pub fn decrement_sub(&mut self) -> Result<DecrementResult> {
self.subscribers -= 1;
match self.subscribers.cmp(&0) {
@@ -165,7 +211,7 @@ impl Threads {
// also use hashbrown?
if self.root_id_to_thread.contains_key(root_id) {
return ThreadResult::Stale(self.root_id_to_thread.get_mut(root_id).unwrap());
return ThreadResult::Stale(self.thread_expected_mut(root_id));
}
// we don't have the thread, query for it!
@@ -198,3 +244,68 @@ impl Threads {
//fn thread_by_id(&self, ndb: &Ndb, id: &[u8; 32]) -> &mut Thread {
//}
}
/// Local thread unsubscribe
pub fn thread_unsubscribe(
ndb: &Ndb,
threads: &mut Threads,
pool: &mut RelayPool,
note_cache: &mut NoteCache,
id: &[u8; 32],
) {
let (unsubscribe, remote_subid) = {
let txn = Transaction::new(ndb).expect("txn");
let root_id = crate::note::root_note_id_from_selected_id(ndb, note_cache, &txn, id);
let thread = threads.thread_mut(ndb, &txn, root_id).get_ptr();
let unsub = thread.decrement_sub();
let mut remote_subid: Option<String> = None;
if let Ok(DecrementResult::LastSubscriber(_subid)) = unsub {
*thread.subscription_mut() = None;
remote_subid = thread.remote_subscription().to_owned();
*thread.remote_subscription_mut() = None;
}
(unsub, remote_subid)
};
match unsubscribe {
Ok(DecrementResult::LastSubscriber(sub)) => {
if let Err(e) = ndb.unsubscribe(sub) {
error!(
"failed to unsubscribe from thread: {e}, subid:{}, {} active subscriptions",
sub.id(),
ndb.subscription_count()
);
} else {
info!(
"Unsubscribed from thread subid:{}. {} active subscriptions",
sub.id(),
ndb.subscription_count()
);
}
// unsub from remote
if let Some(subid) = remote_subid {
pool.unsubscribe(subid);
}
}
Ok(DecrementResult::ActiveSubscribers) => {
info!(
"Keeping thread subscription. {} active subscriptions.",
ndb.subscription_count()
);
// do nothing
}
Err(e) => {
// something is wrong!
error!(
"Thread unsubscribe error: {e}. {} active subsciptions.",
ndb.subscription_count()
);
}
}
}