Switch to unified timeline cache via TimelineKinds

This is a fairly large rewrite which unifies our threads, timelines and
profiles. Now all timelines have a MultiSubscriber, and can be added
and removed to columns just like Threads and Profiles.

Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2025-01-22 15:59:21 -08:00
parent d46e526a45
commit 0cc1d8a600
39 changed files with 1395 additions and 2055 deletions

View File

@@ -1,23 +1,21 @@
use crate::{
actionbar::TimelineOpenResult,
error::Error,
multi_subscriber::MultiSubscriber,
profile::Profile,
thread::Thread,
//subscriptions::SubRefs,
timeline::{PubkeySource, Timeline},
timeline::{Timeline, TimelineKind},
};
use notedeck::{NoteCache, NoteRef, RootNoteId, RootNoteIdBuf};
use notedeck::{filter, FilterState, NoteCache, NoteRef};
use enostr::{Pubkey, PubkeyRef, RelayPool};
use nostrdb::{Filter, FilterBuilder, Ndb, Transaction};
use enostr::RelayPool;
use nostrdb::{Filter, Ndb, Transaction};
use std::collections::HashMap;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
#[derive(Default)]
pub struct TimelineCache {
pub threads: HashMap<RootNoteIdBuf, Thread>,
pub profiles: HashMap<Pubkey, Profile>,
pub timelines: HashMap<TimelineKind, Timeline>,
}
pub enum Vitality<'a, M> {
@@ -41,102 +39,64 @@ impl<'a, M> Vitality<'a, M> {
}
}
#[derive(Hash, Debug, Copy, Clone)]
pub enum TimelineCacheKey<'a> {
Profile(PubkeyRef<'a>),
Thread(RootNoteId<'a>),
}
impl<'a> TimelineCacheKey<'a> {
pub fn profile(pubkey: PubkeyRef<'a>) -> Self {
Self::Profile(pubkey)
}
pub fn thread(root_id: RootNoteId<'a>) -> Self {
Self::Thread(root_id)
}
pub fn bytes(&self) -> &[u8; 32] {
match self {
Self::Profile(pk) => pk.bytes(),
Self::Thread(root_id) => root_id.bytes(),
}
}
/// The filters used to update our timeline cache
pub fn filters_raw(&self) -> Vec<FilterBuilder> {
match self {
TimelineCacheKey::Thread(root_id) => Thread::filters_raw(*root_id),
TimelineCacheKey::Profile(pubkey) => vec![Filter::new()
.authors([pubkey.bytes()])
.kinds([1])
.limit(notedeck::filter::default_limit())],
}
}
pub fn filters_since(&self, since: u64) -> Vec<Filter> {
self.filters_raw()
.into_iter()
.map(|fb| fb.since(since).build())
.collect()
}
pub fn filters(&self) -> Vec<Filter> {
self.filters_raw()
.into_iter()
.map(|mut fb| fb.build())
.collect()
}
}
impl TimelineCache {
fn contains_key(&self, key: TimelineCacheKey<'_>) -> bool {
match key {
TimelineCacheKey::Profile(pubkey) => self.profiles.contains_key(pubkey.bytes()),
TimelineCacheKey::Thread(root_id) => self.threads.contains_key(root_id.bytes()),
/// Pop a timeline from the timeline cache. This only removes the timeline
/// if it has reached 0 subscribers, meaning it was the last one to be
/// removed
pub fn pop(
&mut self,
id: &TimelineKind,
ndb: &mut Ndb,
pool: &mut RelayPool,
) -> Result<(), Error> {
let timeline = if let Some(timeline) = self.timelines.get_mut(id) {
timeline
} else {
return Err(Error::TimelineNotFound);
};
if let Some(sub) = &mut timeline.subscription {
// if this is the last subscriber, remove the timeline from cache
if sub.unsubscribe(ndb, pool) {
debug!(
"popped last timeline {:?}, removing from timeline cache",
id
);
self.timelines.remove(id);
}
Ok(())
} else {
Err(Error::MissingSubscription)
}
}
fn get_expected_mut(&mut self, key: TimelineCacheKey<'_>) -> &mut Timeline {
match key {
TimelineCacheKey::Profile(pubkey) => self
.profiles
.get_mut(pubkey.bytes())
.map(|p| &mut p.timeline),
TimelineCacheKey::Thread(root_id) => self
.threads
.get_mut(root_id.bytes())
.map(|t| &mut t.timeline),
}
.expect("expected notes in timline cache")
fn get_expected_mut(&mut self, key: &TimelineKind) -> &mut Timeline {
self.timelines
.get_mut(key)
.expect("expected notes in timline cache")
}
/// Insert a new profile or thread into the cache, based on the TimelineCacheKey
/// Insert a new timeline into the cache, based on the TimelineKind
#[allow(clippy::too_many_arguments)]
fn insert_new(
&mut self,
id: TimelineCacheKey<'_>,
id: TimelineKind,
txn: &Transaction,
ndb: &Ndb,
notes: &[NoteRef],
note_cache: &mut NoteCache,
filters: Vec<Filter>,
) {
match id {
TimelineCacheKey::Profile(pubkey) => {
let mut profile = Profile::new(PubkeySource::Explicit(pubkey.to_owned()), filters);
// insert initial notes into timeline
profile.timeline.insert_new(txn, ndb, note_cache, notes);
self.profiles.insert(pubkey.to_owned(), profile);
}
let mut timeline = if let Some(timeline) = id.clone().into_timeline(txn, ndb) {
timeline
} else {
error!("Error creating timeline from {:?}", &id);
return;
};
TimelineCacheKey::Thread(root_id) => {
let mut thread = Thread::new(root_id.to_owned());
thread.timeline.insert_new(txn, ndb, note_cache, notes);
self.threads.insert(root_id.to_owned(), thread);
}
}
// insert initial notes into timeline
timeline.insert_new(txn, ndb, note_cache, notes);
self.timelines.insert(id, timeline);
}
/// Get and/or update the notes associated with this timeline
@@ -145,24 +105,28 @@ impl TimelineCache {
ndb: &Ndb,
note_cache: &mut NoteCache,
txn: &Transaction,
id: TimelineCacheKey<'a>,
id: &TimelineKind,
) -> Vitality<'a, Timeline> {
// we can't use the naive hashmap entry API here because lookups
// require a copy, wait until we have a raw entry api. We could
// also use hashbrown?
if self.contains_key(id) {
if self.timelines.contains_key(id) {
return Vitality::Stale(self.get_expected_mut(id));
}
let filters = id.filters();
let notes = if let Ok(results) = ndb.query(txn, &filters, 1000) {
results
.into_iter()
.map(NoteRef::from_query_result)
.collect()
let notes = if let FilterState::Ready(filters) = id.filters(txn, ndb) {
if let Ok(results) = ndb.query(txn, &filters, 1000) {
results
.into_iter()
.map(NoteRef::from_query_result)
.collect()
} else {
debug!("got no results from TimelineCache lookup for {:?}", id);
vec![]
}
} else {
debug!("got no results from TimelineCache lookup for {:?}", id);
// filter is not ready yet
vec![]
};
@@ -172,44 +136,37 @@ impl TimelineCache {
info!("found NotesHolder with {} notes", notes.len());
}
self.insert_new(id, txn, ndb, &notes, note_cache, filters);
self.insert_new(id.to_owned(), txn, ndb, &notes, note_cache);
Vitality::Fresh(self.get_expected_mut(id))
}
pub fn subscription(
&mut self,
id: TimelineCacheKey<'_>,
) -> Option<&mut Option<MultiSubscriber>> {
match id {
TimelineCacheKey::Profile(pubkey) => self
.profiles
.get_mut(pubkey.bytes())
.map(|p| &mut p.subscription),
TimelineCacheKey::Thread(root_id) => self
.threads
.get_mut(root_id.bytes())
.map(|t| &mut t.subscription),
}
}
pub fn open<'a>(
/// Open a timeline, this is another way of saying insert a timeline
/// into the timeline cache. If there exists a timeline already, we
/// bump its subscription reference count. If it's new we start a new
/// subscription
pub fn open(
&mut self,
ndb: &Ndb,
note_cache: &mut NoteCache,
txn: &Transaction,
pool: &mut RelayPool,
id: TimelineCacheKey<'a>,
) -> Option<TimelineOpenResult<'a>> {
let result = match self.notes(ndb, note_cache, txn, id) {
id: &TimelineKind,
) -> Option<TimelineOpenResult> {
let (open_result, timeline) = match self.notes(ndb, note_cache, txn, id) {
Vitality::Stale(timeline) => {
// The timeline cache is stale, let's update it
let notes = find_new_notes(timeline.all_or_any_notes(), id, txn, ndb);
let cached_timeline_result = if notes.is_empty() {
let notes = find_new_notes(
timeline.all_or_any_notes(),
timeline.subscription.as_ref().map(|s| &s.filters)?,
txn,
ndb,
);
let open_result = if notes.is_empty() {
None
} else {
let new_notes = notes.iter().map(|n| n.key).collect();
Some(TimelineOpenResult::new_notes(new_notes, id))
Some(TimelineOpenResult::new_notes(new_notes, id.clone()))
};
// we can't insert and update the VirtualList now, because we
@@ -217,42 +174,36 @@ impl TimelineCache {
// result instead
//
// holder.get_view().insert(&notes); <-- no
cached_timeline_result
(open_result, timeline)
}
Vitality::Fresh(_timeline) => None,
Vitality::Fresh(timeline) => (None, timeline),
};
let sub_id = if let Some(sub) = self.subscription(id) {
if let Some(multi_subscriber) = sub {
multi_subscriber.subscribe(ndb, pool);
multi_subscriber.sub.as_ref().map(|s| s.local)
} else {
let mut multi_sub = MultiSubscriber::new(id.filters());
multi_sub.subscribe(ndb, pool);
let sub_id = multi_sub.sub.as_ref().map(|s| s.local);
*sub = Some(multi_sub);
sub_id
}
if let Some(multi_sub) = &mut timeline.subscription {
debug!("got open with *old* subscription for {:?}", &timeline.kind);
multi_sub.subscribe(ndb, pool);
} else if let Some(filter) = timeline.filter.get_any_ready() {
debug!("got open with *new* subscription for {:?}", &timeline.kind);
let mut multi_sub = MultiSubscriber::new(filter.clone());
multi_sub.subscribe(ndb, pool);
timeline.subscription = Some(multi_sub);
} else {
None
// This should never happen reasoning, self.notes would have
// failed above if the filter wasn't ready
error!(
"open: filter not ready, so could not setup subscription. this should never happen"
);
};
let timeline = self.get_expected_mut(id);
if let Some(sub_id) = sub_id {
timeline.subscription = Some(sub_id);
}
// TODO: We have subscription ids tracked in different places. Fix this
result
open_result
}
}
/// Look for new thread notes since our last fetch
fn find_new_notes(
notes: &[NoteRef],
id: TimelineCacheKey<'_>,
filters: &[Filter],
txn: &Transaction,
ndb: &Ndb,
) -> Vec<NoteRef> {
@@ -261,7 +212,7 @@ fn find_new_notes(
}
let last_note = notes[0];
let filters = id.filters_since(last_note.created_at + 1);
let filters = filter::make_filters_since(filters, last_note.created_at + 1);
if let Ok(results) = ndb.query(txn, &filters, 1000) {
debug!("got {} results from NotesHolder update", results.len());