use AccountSubs for timeline contact sub

Signed-off-by: kernelkind <kernelkind@gmail.com>
This commit is contained in:
kernelkind
2025-07-12 16:48:20 -04:00
parent 44edffc596
commit 46633d0513
4 changed files with 111 additions and 58 deletions

View File

@@ -37,10 +37,18 @@ impl FilterStates {
}
}
pub fn get_any_gotremote(&self) -> Option<(&str, Subscription)> {
pub fn get_any_gotremote(&self) -> Option<GotRemoteResult> {
for (k, v) in self.states.iter() {
if let FilterState::GotRemote(sub) = v {
return Some((k, *sub));
if let FilterState::GotRemote(item_type) = v {
return match item_type {
GotRemoteType::Normal(subscription) => Some(GotRemoteResult::Normal {
relay_id: k.to_owned(),
sub_id: *subscription,
}),
GotRemoteType::Contact => Some(GotRemoteResult::Contact {
relay_id: k.to_owned(),
}),
};
}
}
@@ -85,12 +93,34 @@ impl FilterStates {
#[derive(Debug, Clone)]
pub enum FilterState {
NeedsRemote(Vec<Filter>),
FetchingRemote(UnifiedSubscription),
GotRemote(Subscription),
FetchingRemote(FetchingRemoteType),
GotRemote(GotRemoteType),
Ready(Vec<Filter>),
Broken(FilterError),
}
pub enum GotRemoteResult {
Normal {
relay_id: String,
sub_id: Subscription,
},
Contact {
relay_id: String,
},
}
#[derive(Debug, Clone)]
pub enum FetchingRemoteType {
Normal(UnifiedSubscription),
Contact,
}
#[derive(Debug, Clone)]
pub enum GotRemoteType {
Normal(Subscription),
Contact,
}
impl FilterState {
/// We tried to fetch a filter but we wither got no data or the data
/// was corrupted, preventing us from getting to the Ready state.
@@ -116,7 +146,7 @@ impl FilterState {
/// We got the remote data. Local data should be available to build
/// the filter for the [`FilterState::Ready`] state
pub fn got_remote(local_sub: Subscription) -> Self {
Self::GotRemote(local_sub)
Self::GotRemote(GotRemoteType::Normal(local_sub))
}
/// We have sent off a remote subscription to get data needed for the
@@ -126,7 +156,7 @@ impl FilterState {
local: local_sub,
remote: sub_id,
};
Self::FetchingRemote(unified_sub)
Self::FetchingRemote(FetchingRemoteType::Normal(unified_sub))
}
}

View File

@@ -117,12 +117,12 @@ fn try_process_event(
.send_initial_filters(app_ctx.pool, &ev.relay);
timeline::send_initial_timeline_filters(
app_ctx.ndb,
damus.since_optimize,
&mut damus.timeline_cache,
&mut damus.subscriptions,
app_ctx.pool,
&ev.relay,
app_ctx.accounts,
);
}
// TODO: handle reconnects
@@ -136,8 +136,13 @@ fn try_process_event(
}
for (_kind, timeline) in damus.timeline_cache.timelines.iter_mut() {
let is_ready =
timeline::is_timeline_ready(app_ctx.ndb, app_ctx.pool, app_ctx.note_cache, timeline);
let is_ready = timeline::is_timeline_ready(
app_ctx.ndb,
app_ctx.pool,
app_ctx.note_cache,
timeline,
app_ctx.accounts,
);
if is_ready {
let txn = Transaction::new(app_ctx.ndb).expect("txn");
@@ -260,12 +265,7 @@ fn handle_eose(
let filter_state = timeline.filter.get_mut(relay_url);
// If this request was fetching a contact list, our filter
// state should be "FetchingRemote". We look at the local
// subscription for that filter state and get the subscription id
let local_sub = if let FilterState::FetchingRemote(unisub) = filter_state {
unisub.local
} else {
let FilterState::FetchingRemote(fetching_remote_type) = filter_state else {
// TODO: we could have multiple contact list results, we need
// to check to see if this one is newer and use that instead
warn!(
@@ -275,17 +275,21 @@ fn handle_eose(
return Ok(());
};
info!(
"got contact list from {}, updating filter_state to got_remote",
relay_url
);
let new_filter_state = match fetching_remote_type {
notedeck::filter::FetchingRemoteType::Normal(unified_subscription) => {
FilterState::got_remote(unified_subscription.local)
}
notedeck::filter::FetchingRemoteType::Contact => {
FilterState::GotRemote(notedeck::filter::GotRemoteType::Contact)
}
};
// We take the subscription id and pass it to the new state of
// "GotRemote". This will let future frames know that it can try
// to look for the contact list in nostrdb.
timeline
.filter
.set_relay_state(relay_url.to_string(), FilterState::got_remote(local_sub));
.set_relay_state(relay_url.to_string(), new_filter_state);
}
}

View File

@@ -7,7 +7,8 @@ use crate::{
};
use notedeck::{
filter, CachedNote, FilterError, FilterState, FilterStates, NoteCache, NoteRef, UnknownIds,
filter, Accounts, CachedNote, ContactState, FilterError, FilterState, FilterStates, NoteCache,
NoteRef, UnknownIds,
};
use egui_virtual_list::VirtualList;
@@ -474,16 +475,17 @@ pub fn setup_new_timeline(
pool: &mut RelayPool,
note_cache: &mut NoteCache,
since_optimize: bool,
accounts: &Accounts,
) {
// if we're ready, setup local subs
if is_timeline_ready(ndb, pool, note_cache, timeline) {
if is_timeline_ready(ndb, pool, note_cache, timeline, accounts) {
if let Err(err) = setup_timeline_nostrdb_sub(ndb, txn, note_cache, timeline) {
error!("setup_new_timeline: {err}");
}
}
for relay in &mut pool.relays {
send_initial_timeline_filter(ndb, since_optimize, subs, relay, timeline);
send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts);
}
}
@@ -492,29 +494,29 @@ pub fn setup_new_timeline(
/// situations where you are adding a new timeline, use
/// setup_new_timeline.
pub fn send_initial_timeline_filters(
ndb: &Ndb,
since_optimize: bool,
timeline_cache: &mut TimelineCache,
subs: &mut Subscriptions,
pool: &mut RelayPool,
relay_id: &str,
accounts: &Accounts,
) -> Option<()> {
info!("Sending initial filters to {}", relay_id);
let relay = &mut pool.relays.iter_mut().find(|r| r.url() == relay_id)?;
for (_kind, timeline) in timeline_cache.timelines.iter_mut() {
send_initial_timeline_filter(ndb, since_optimize, subs, relay, timeline);
send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts);
}
Some(())
}
pub fn send_initial_timeline_filter(
ndb: &Ndb,
can_since_optimize: bool,
subs: &mut Subscriptions,
relay: &mut PoolRelay,
timeline: &mut Timeline,
accounts: &Accounts,
) {
let filter_state = timeline.filter.get_mut(relay.url());
@@ -572,34 +574,34 @@ pub fn send_initial_timeline_filter(
}
// we need some data first
FilterState::NeedsRemote(filter) => {
fetch_contact_list(filter.to_owned(), ndb, subs, relay, timeline)
}
FilterState::NeedsRemote(_) => fetch_contact_list(subs, relay, timeline, accounts),
}
}
pub fn fetch_contact_list(
filter: Vec<Filter>,
ndb: &Ndb,
subs: &mut Subscriptions,
relay: &mut PoolRelay,
timeline: &mut Timeline,
accounts: &Accounts,
) {
let sub_kind = SubKind::FetchingContactList(timeline.kind.clone());
let sub_id = subscriptions::new_sub_id();
let local_sub = ndb.subscribe(&filter).expect("sub");
let sub = &accounts.get_subs().contacts;
timeline.filter.set_relay_state(
relay.url().to_string(),
FilterState::fetching_remote(sub_id.clone(), local_sub),
);
let new_filter_state = match accounts.get_selected_account().data.contacts.get_state() {
ContactState::Unreceived => {
FilterState::FetchingRemote(filter::FetchingRemoteType::Contact)
}
ContactState::Received {
contacts: _,
note_key: _,
} => FilterState::GotRemote(filter::GotRemoteType::Contact),
};
subs.subs.insert(sub_id.clone(), sub_kind);
timeline
.filter
.set_relay_state(relay.url().to_string(), new_filter_state);
info!("fetching contact list from {}", relay.url());
if let Err(err) = relay.subscribe(sub_id, filter) {
error!("error subscribing: {err}");
}
subs.subs.insert(sub.remote.clone(), sub_kind);
}
fn setup_initial_timeline(
@@ -688,6 +690,7 @@ pub fn is_timeline_ready(
pool: &mut RelayPool,
note_cache: &mut NoteCache,
timeline: &mut Timeline,
accounts: &Accounts,
) -> bool {
// TODO: we should debounce the filter states a bit to make sure we have
// seen all of the different contact lists from each relay
@@ -695,26 +698,40 @@ pub fn is_timeline_ready(
return true;
}
let (relay_id, sub) = if let Some((relay_id, sub)) = timeline.filter.get_any_gotremote() {
(relay_id.to_string(), sub)
} else {
let Some(res) = timeline.filter.get_any_gotremote() else {
return false;
};
// We got at least one eose for our filter request. Let's see
// if nostrdb is done processing it yet.
let res = ndb.poll_for_notes(sub, 1);
if res.is_empty() {
debug!(
"check_timeline_filter_state: no notes found (yet?) for timeline {:?}",
timeline
);
return false;
}
let (relay_id, note_key) = match res {
filter::GotRemoteResult::Normal { relay_id, sub_id } => {
// We got at least one eose for our filter request. Let's see
// if nostrdb is done processing it yet.
let res = ndb.poll_for_notes(sub_id, 1);
if res.is_empty() {
debug!(
"check_timeline_filter_state: no notes found (yet?) for timeline {:?}",
timeline
);
return false;
}
info!("notes found for contact timeline after GotRemote!");
info!("notes found for contact timeline after GotRemote!");
(relay_id, res[0])
}
filter::GotRemoteResult::Contact { relay_id } => {
let ContactState::Received {
contacts: _,
note_key,
} = accounts.get_selected_account().data.contacts.get_state()
else {
return false;
};
(relay_id, *note_key)
}
};
let note_key = res[0];
let with_hashtags = false;
let filter = {

View File

@@ -623,6 +623,7 @@ pub fn render_add_column_routes(
ctx.pool,
ctx.note_cache,
app.since_optimize,
ctx.accounts,
);
app.columns_mut(ctx.accounts)
@@ -664,6 +665,7 @@ pub fn render_add_column_routes(
ctx.pool,
ctx.note_cache,
app.since_optimize,
ctx.accounts,
);
app.columns_mut(ctx.accounts)