Fix filter states when adding columns
This fixes various issues with filter states when adding columns. We now maintain multiple states per relay so that we don't lose track of anything. Fixes: https://github.com/damus-io/notedeck/issues/431 Fixes: https://github.com/damus-io/notedeck/issues/359 Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
333
src/app.rs
333
src/app.rs
@@ -6,20 +6,18 @@ use crate::{
|
||||
args::Args,
|
||||
column::Columns,
|
||||
draft::Drafts,
|
||||
error::{Error, FilterError},
|
||||
filter::{self, FilterState},
|
||||
filter::FilterState,
|
||||
frame_history::FrameHistory,
|
||||
imgcache::ImageCache,
|
||||
nav,
|
||||
note::NoteRef,
|
||||
notecache::{CachedNote, NoteCache},
|
||||
notecache::NoteCache,
|
||||
notes_holder::NotesHolderStorage,
|
||||
profile::Profile,
|
||||
storage::{self, DataPath, DataPathType, Directory, FileKeyStorage, KeyStorageType},
|
||||
subscriptions::{SubKind, Subscriptions},
|
||||
support::Support,
|
||||
thread::Thread,
|
||||
timeline::{Timeline, TimelineId, TimelineKind, ViewFilter},
|
||||
timeline::{self, Timeline, TimelineKind},
|
||||
ui::{self, DesktopSidePanel},
|
||||
unknowns::UnknownIds,
|
||||
view_state::ViewState,
|
||||
@@ -32,12 +30,12 @@ use uuid::Uuid;
|
||||
use egui::{Context, Frame, Style};
|
||||
use egui_extras::{Size, StripBuilder};
|
||||
|
||||
use nostrdb::{Config, Filter, Ndb, Note, Transaction};
|
||||
use nostrdb::{Config, Filter, Ndb, Transaction};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tracing::{error, info, trace, warn};
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub enum DamusState {
|
||||
@@ -98,98 +96,6 @@ fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) {
|
||||
}
|
||||
}
|
||||
|
||||
fn send_initial_timeline_filter(
|
||||
ndb: &Ndb,
|
||||
can_since_optimize: bool,
|
||||
subs: &mut Subscriptions,
|
||||
pool: &mut RelayPool,
|
||||
timeline: &mut Timeline,
|
||||
to: &str,
|
||||
) {
|
||||
let filter_state = timeline.filter.clone();
|
||||
|
||||
match filter_state {
|
||||
FilterState::Broken(err) => {
|
||||
error!(
|
||||
"FetchingRemote state in broken state when sending initial timeline filter? {err}"
|
||||
);
|
||||
}
|
||||
|
||||
FilterState::FetchingRemote(_unisub) => {
|
||||
error!("FetchingRemote state when sending initial timeline filter?");
|
||||
}
|
||||
|
||||
FilterState::GotRemote(_sub) => {
|
||||
error!("GotRemote state when sending initial timeline filter?");
|
||||
}
|
||||
|
||||
FilterState::Ready(filter) => {
|
||||
let filter = filter.to_owned();
|
||||
let new_filters = filter.into_iter().map(|f| {
|
||||
// limit the size of remote filters
|
||||
let default_limit = filter::default_remote_limit();
|
||||
let mut lim = f.limit().unwrap_or(default_limit);
|
||||
let mut filter = f;
|
||||
if lim > default_limit {
|
||||
lim = default_limit;
|
||||
filter = filter.limit_mut(lim);
|
||||
}
|
||||
|
||||
let notes = timeline.notes(ViewFilter::NotesAndReplies);
|
||||
|
||||
// Should we since optimize? Not always. For example
|
||||
// if we only have a few notes locally. One way to
|
||||
// determine this is by looking at the current filter
|
||||
// and seeing what its limit is. If we have less
|
||||
// notes than the limit, we might want to backfill
|
||||
// older notes
|
||||
if can_since_optimize && filter::should_since_optimize(lim, notes.len()) {
|
||||
filter = filter::since_optimize_filter(filter, notes);
|
||||
} else {
|
||||
warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter);
|
||||
}
|
||||
|
||||
filter
|
||||
}).collect();
|
||||
|
||||
//let sub_id = damus.gen_subid(&SubKind::Initial);
|
||||
let sub_id = Uuid::new_v4().to_string();
|
||||
subs.subs.insert(sub_id.clone(), SubKind::Initial);
|
||||
|
||||
let cmd = ClientMessage::req(sub_id, new_filters);
|
||||
pool.send_to(&cmd, to);
|
||||
}
|
||||
|
||||
// we need some data first
|
||||
FilterState::NeedsRemote(filter) => {
|
||||
let sub_kind = SubKind::FetchingContactList(timeline.id);
|
||||
//let sub_id = damus.gen_subid(&sub_kind);
|
||||
let sub_id = Uuid::new_v4().to_string();
|
||||
let local_sub = ndb.subscribe(&filter).expect("sub");
|
||||
|
||||
timeline.filter = FilterState::fetching_remote(sub_id.clone(), local_sub);
|
||||
|
||||
subs.subs.insert(sub_id.clone(), sub_kind);
|
||||
|
||||
pool.subscribe(sub_id, filter.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn send_initial_filters(damus: &mut Damus, relay_url: &str) {
|
||||
info!("Sending initial filters to {}", relay_url);
|
||||
for timeline in damus.columns.timelines_mut() {
|
||||
send_initial_timeline_filter(
|
||||
&damus.ndb,
|
||||
damus.since_optimize,
|
||||
&mut damus.subscriptions,
|
||||
&mut damus.pool,
|
||||
timeline,
|
||||
relay_url,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_key_events(input: &egui::InputState, _pixels_per_point: f32, columns: &mut Columns) {
|
||||
for event in &input.raw.events {
|
||||
if let egui::Event::Key {
|
||||
@@ -225,17 +131,31 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
|
||||
};
|
||||
damus.pool.keepalive_ping(wakeup);
|
||||
|
||||
// pool stuff
|
||||
while let Some(ev) = damus.pool.try_recv() {
|
||||
let relay = ev.relay.to_owned();
|
||||
// NOTE: we don't use the while let loop due to borrow issues
|
||||
#[allow(clippy::while_let_loop)]
|
||||
loop {
|
||||
let ev = if let Some(ev) = damus.pool.try_recv() {
|
||||
ev.into_owned()
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
|
||||
match (&ev.event).into() {
|
||||
RelayEvent::Opened => send_initial_filters(damus, &relay),
|
||||
RelayEvent::Opened => {
|
||||
timeline::send_initial_timeline_filters(
|
||||
&damus.ndb,
|
||||
damus.since_optimize,
|
||||
&mut damus.columns,
|
||||
&mut damus.subscriptions,
|
||||
&mut damus.pool,
|
||||
&ev.relay,
|
||||
);
|
||||
}
|
||||
// TODO: handle reconnects
|
||||
RelayEvent::Closed => warn!("{} connection closed", &relay),
|
||||
RelayEvent::Error(e) => error!("{}: {}", &relay, e),
|
||||
RelayEvent::Closed => warn!("{} connection closed", &ev.relay),
|
||||
RelayEvent::Error(e) => error!("{}: {}", &ev.relay, e),
|
||||
RelayEvent::Other(msg) => trace!("other event {:?}", &msg),
|
||||
RelayEvent::Message(msg) => process_message(damus, &relay, &msg),
|
||||
RelayEvent::Message(msg) => process_message(damus, &ev.relay, &msg),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,9 +163,11 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
|
||||
for timeline_ind in 0..n_timelines {
|
||||
let is_ready = {
|
||||
let timeline = &mut damus.columns.timelines[timeline_ind];
|
||||
matches!(
|
||||
is_timeline_ready(&damus.ndb, &mut damus.pool, &mut damus.note_cache, timeline),
|
||||
Ok(true)
|
||||
timeline::is_timeline_ready(
|
||||
&damus.ndb,
|
||||
&mut damus.pool,
|
||||
&mut damus.note_cache,
|
||||
timeline,
|
||||
)
|
||||
};
|
||||
|
||||
@@ -285,183 +207,11 @@ fn unknown_id_send(damus: &mut Damus) {
|
||||
damus.pool.send(&msg);
|
||||
}
|
||||
|
||||
/// Check our timeline filter and see if we have any filter data ready.
|
||||
/// Our timelines may require additional data before it is functional. For
|
||||
/// example, when we have to fetch a contact list before we do the actual
|
||||
/// following list query.
|
||||
fn is_timeline_ready(
|
||||
ndb: &Ndb,
|
||||
pool: &mut RelayPool,
|
||||
note_cache: &mut NoteCache,
|
||||
timeline: &mut Timeline,
|
||||
) -> Result<bool> {
|
||||
let sub = match &timeline.filter {
|
||||
FilterState::GotRemote(sub) => *sub,
|
||||
FilterState::Ready(_f) => return Ok(true),
|
||||
_ => return Ok(false),
|
||||
};
|
||||
|
||||
// We got at least one eose for our filter request. Let's see
|
||||
// if nostrdb is done processing it yet.
|
||||
let res = ndb.poll_for_notes(sub, 1);
|
||||
if res.is_empty() {
|
||||
debug!(
|
||||
"check_timeline_filter_state: no notes found (yet?) for timeline {:?}",
|
||||
timeline
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
info!("notes found for contact timeline after GotRemote!");
|
||||
|
||||
let note_key = res[0];
|
||||
|
||||
let filter = {
|
||||
let txn = Transaction::new(ndb).expect("txn");
|
||||
let note = ndb.get_note_by_key(&txn, note_key).expect("note");
|
||||
filter::filter_from_tags(¬e).map(|f| f.into_follow_filter())
|
||||
};
|
||||
|
||||
// TODO: into_follow_filter is hardcoded to contact lists, let's generalize
|
||||
match filter {
|
||||
Err(Error::Filter(e)) => {
|
||||
error!("got broken when building filter {e}");
|
||||
timeline.filter = FilterState::broken(e);
|
||||
}
|
||||
Err(err) => {
|
||||
error!("got broken when building filter {err}");
|
||||
timeline.filter = FilterState::broken(FilterError::EmptyContactList);
|
||||
return Err(err);
|
||||
}
|
||||
Ok(filter) => {
|
||||
// we just switched to the ready state, we should send initial
|
||||
// queries and setup the local subscription
|
||||
info!("Found contact list! Setting up local and remote contact list query");
|
||||
setup_initial_timeline(ndb, timeline, note_cache, &filter).expect("setup init");
|
||||
timeline.filter = FilterState::ready(filter.clone());
|
||||
|
||||
//let ck = &timeline.kind;
|
||||
//let subid = damus.gen_subid(&SubKind::Column(ck.clone()));
|
||||
let subid = Uuid::new_v4().to_string();
|
||||
pool.subscribe(subid, filter)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
#[cfg(feature = "profiling")]
|
||||
fn setup_profiling() {
|
||||
puffin::set_scopes_on(true); // tell puffin to collect data
|
||||
}
|
||||
|
||||
fn setup_initial_timeline(
|
||||
ndb: &Ndb,
|
||||
timeline: &mut Timeline,
|
||||
note_cache: &mut NoteCache,
|
||||
filters: &[Filter],
|
||||
) -> Result<()> {
|
||||
timeline.subscription = Some(ndb.subscribe(filters)?);
|
||||
let txn = Transaction::new(ndb)?;
|
||||
debug!(
|
||||
"querying nostrdb sub {:?} {:?}",
|
||||
timeline.subscription, timeline.filter
|
||||
);
|
||||
let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32;
|
||||
let notes = ndb
|
||||
.query(&txn, filters, lim)?
|
||||
.into_iter()
|
||||
.map(NoteRef::from_query_result)
|
||||
.collect();
|
||||
|
||||
copy_notes_into_timeline(timeline, &txn, ndb, note_cache, notes);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn copy_notes_into_timeline(
|
||||
timeline: &mut Timeline,
|
||||
txn: &Transaction,
|
||||
ndb: &Ndb,
|
||||
note_cache: &mut NoteCache,
|
||||
notes: Vec<NoteRef>,
|
||||
) {
|
||||
let filters = {
|
||||
let views = &timeline.views;
|
||||
let filters: Vec<fn(&CachedNote, &Note) -> bool> =
|
||||
views.iter().map(|v| v.filter.filter()).collect();
|
||||
filters
|
||||
};
|
||||
|
||||
for note_ref in notes {
|
||||
for (view, filter) in filters.iter().enumerate() {
|
||||
if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) {
|
||||
if filter(
|
||||
note_cache.cached_note_or_insert_mut(note_ref.key, ¬e),
|
||||
¬e,
|
||||
) {
|
||||
timeline.views[view].notes.push(note_ref)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn setup_initial_nostrdb_subs(
|
||||
ndb: &Ndb,
|
||||
note_cache: &mut NoteCache,
|
||||
columns: &mut Columns,
|
||||
) -> Result<()> {
|
||||
for timeline in columns.timelines_mut() {
|
||||
setup_nostrdb_sub(ndb, note_cache, timeline)?
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn setup_nostrdb_sub(ndb: &Ndb, note_cache: &mut NoteCache, timeline: &mut Timeline) -> Result<()> {
|
||||
match &timeline.filter {
|
||||
FilterState::Ready(filters) => {
|
||||
{ setup_initial_timeline(ndb, timeline, note_cache, &filters.clone()) }?
|
||||
}
|
||||
|
||||
FilterState::Broken(err) => {
|
||||
error!("FetchingRemote state broken in setup_initial_nostr_subs: {err}")
|
||||
}
|
||||
FilterState::FetchingRemote(_) => {
|
||||
error!("FetchingRemote state in setup_initial_nostr_subs")
|
||||
}
|
||||
FilterState::GotRemote(_) => {
|
||||
error!("GotRemote state in setup_initial_nostr_subs")
|
||||
}
|
||||
FilterState::NeedsRemote(_filters) => {
|
||||
// can't do anything yet, we defer to first connect to send
|
||||
// remote filters
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn setup_new_nostrdb_sub(
|
||||
ndb: &Ndb,
|
||||
note_cache: &mut NoteCache,
|
||||
columns: &mut Columns,
|
||||
new_timeline_id: TimelineId,
|
||||
) -> Result<()> {
|
||||
if let Some(timeline) = columns.find_timeline_mut(new_timeline_id) {
|
||||
info!("Setting up timeline sub for {}", timeline.id);
|
||||
if let FilterState::Ready(filters) = &timeline.filter {
|
||||
for filter in filters {
|
||||
info!("Setting up filter {:?}", filter.json());
|
||||
}
|
||||
}
|
||||
setup_nostrdb_sub(ndb, note_cache, timeline)?
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_damus(damus: &mut Damus, ctx: &egui::Context) {
|
||||
match damus.state {
|
||||
DamusState::Initializing => {
|
||||
@@ -473,8 +223,12 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) {
|
||||
damus
|
||||
.subscriptions()
|
||||
.insert("unknownids".to_string(), SubKind::OneShot);
|
||||
setup_initial_nostrdb_subs(&damus.ndb, &mut damus.note_cache, &mut damus.columns)
|
||||
.expect("home subscription failed");
|
||||
timeline::setup_initial_nostrdb_subs(
|
||||
&damus.ndb,
|
||||
&mut damus.note_cache,
|
||||
&mut damus.columns,
|
||||
)
|
||||
.expect("home subscription failed");
|
||||
}
|
||||
|
||||
DamusState::Initialized => (),
|
||||
@@ -545,10 +299,12 @@ fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let filter_state = timeline.filter.get(relay_url);
|
||||
|
||||
// If this request was fetching a contact list, our filter
|
||||
// state should be "FetchingRemote". We look at the local
|
||||
// subscription for that filter state and get the subscription id
|
||||
let local_sub = if let FilterState::FetchingRemote(unisub) = &timeline.filter {
|
||||
let local_sub = if let FilterState::FetchingRemote(unisub) = filter_state {
|
||||
unisub.local
|
||||
} else {
|
||||
// TODO: we could have multiple contact list results, we need
|
||||
@@ -560,10 +316,17 @@ fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
info!(
|
||||
"got contact list from {}, updating filter_state to got_remote",
|
||||
relay_url
|
||||
);
|
||||
|
||||
// We take the subscription id and pass it to the new state of
|
||||
// "GotRemote". This will let future frames know that it can try
|
||||
// to look for the contact list in nostrdb.
|
||||
timeline.filter = FilterState::got_remote(local_sub);
|
||||
timeline
|
||||
.filter
|
||||
.set_relay_state(relay_url.to_string(), FilterState::got_remote(local_sub));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user