perf: coordinate unknown id lookups

This is a huge improvement over what it was before. Now all unknown id
lookups are debounced and happen through a central coordinator. This
ensures there is no duplication between timelines.

Fixes: https://github.com/damus-io/notedeck/issues/279
Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2024-09-02 17:35:59 -07:00
parent ad244d48c0
commit bc8a8d4a74
7 changed files with 324 additions and 224 deletions

View File

@@ -20,6 +20,7 @@ use crate::timeline::{Timeline, TimelineSource, ViewFilter};
use crate::ui::note::PostAction;
use crate::ui::{self, AccountSelectionWidget, DesktopGlobalPopup};
use crate::ui::{DesktopSidePanel, RelayView, View};
use crate::unknowns::UnknownIds;
use crate::{filter, Result};
use egui_nav::{Nav, NavAction};
use enostr::{ClientMessage, RelayEvent, RelayMessage, RelayPool};
@@ -30,10 +31,9 @@ use uuid::Uuid;
use egui::{Context, Frame, Style};
use egui_extras::{Size, StripBuilder};
use nostrdb::{BlockType, Config, Filter, Mention, Ndb, Note, NoteKey, Transaction};
use nostrdb::{Config, Filter, Ndb, Note, Transaction};
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
use tracing::{debug, error, info, trace, warn};
@@ -57,6 +57,7 @@ pub struct Damus {
pub selected_timeline: i32,
pub ndb: Ndb,
pub unknown_ids: UnknownIds,
pub drafts: Drafts,
pub threads: Threads,
pub img_cache: ImageCache,
@@ -262,12 +263,11 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
}
}
let mut unknown_ids: HashSet<UnknownId> = HashSet::new();
for timeline in 0..damus.timelines.len() {
let src = TimelineSource::column(timeline);
if let Ok(true) = is_timeline_ready(damus, timeline) {
if let Err(err) = src.poll_notes_into_view(damus, &mut unknown_ids) {
if let Err(err) = src.poll_notes_into_view(damus) {
error!("poll_notes_into_view: {err}");
}
} else {
@@ -275,21 +275,24 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
}
}
/*
let unknown_ids: Vec<UnknownId> = unknown_ids.into_iter().collect();
if let Some(filters) = get_unknown_ids_filter(&unknown_ids) {
info!(
"Getting {} unknown author profiles from relays",
unknown_ids.len()
);
let msg = ClientMessage::req("unknown_ids".to_string(), filters);
damus.pool.send(&msg);
if damus.unknown_ids.ready_to_send() {
unknown_id_send(damus);
}
*/
Ok(())
}
fn unknown_id_send(damus: &mut Damus) {
let filter = damus.unknown_ids.filter().expect("filter");
info!(
"Getting {} unknown ids from relays",
damus.unknown_ids.ids().len()
);
let msg = ClientMessage::req("unknownids".to_string(), filter);
damus.unknown_ids.clear();
damus.pool.send(&msg);
}
/// Check our timeline filter and see if we have any filter data ready.
/// Our timelines may require additional data before it is functional. For
/// example, when we have to fetch a contact list before we do the actual
@@ -345,117 +348,6 @@ fn is_timeline_ready(damus: &mut Damus, timeline: usize) -> Result<bool> {
Ok(true)
}
#[derive(Hash, Clone, Copy, PartialEq, Eq)]
pub enum UnknownId {
Pubkey([u8; 32]),
Id([u8; 32]),
}
impl UnknownId {
pub fn is_pubkey(&self) -> Option<&[u8; 32]> {
match self {
UnknownId::Pubkey(pk) => Some(pk),
_ => None,
}
}
pub fn is_id(&self) -> Option<&[u8; 32]> {
match self {
UnknownId::Id(id) => Some(id),
_ => None,
}
}
}
/// Look for missing notes in various parts of notes that we see:
///
/// - pubkeys and notes mentioned inside the note
/// - notes being replied to
///
/// We return all of this in a HashSet so that we can fetch these from
/// remote relays.
///
pub fn get_unknown_note_ids<'a>(
ndb: &Ndb,
cached_note: &CachedNote,
txn: &'a Transaction,
note: &Note<'a>,
note_key: NoteKey,
ids: &mut HashSet<UnknownId>,
) -> Result<()> {
// the author pubkey
if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() {
ids.insert(UnknownId::Pubkey(*note.pubkey()));
}
// pull notes that notes are replying to
if cached_note.reply.root.is_some() {
let note_reply = cached_note.reply.borrow(note.tags());
if let Some(root) = note_reply.root() {
if ndb.get_note_by_id(txn, root.id).is_err() {
ids.insert(UnknownId::Id(*root.id));
}
}
if !note_reply.is_reply_to_root() {
if let Some(reply) = note_reply.reply() {
if ndb.get_note_by_id(txn, reply.id).is_err() {
ids.insert(UnknownId::Id(*reply.id));
}
}
}
}
let blocks = ndb.get_blocks_by_key(txn, note_key)?;
for block in blocks.iter(note) {
if block.blocktype() != BlockType::MentionBech32 {
continue;
}
match block.as_mention().unwrap() {
Mention::Pubkey(npub) => {
if ndb.get_profile_by_pubkey(txn, npub.pubkey()).is_err() {
ids.insert(UnknownId::Pubkey(*npub.pubkey()));
}
}
Mention::Profile(nprofile) => {
if ndb.get_profile_by_pubkey(txn, nprofile.pubkey()).is_err() {
ids.insert(UnknownId::Pubkey(*nprofile.pubkey()));
}
}
Mention::Event(ev) => match ndb.get_note_by_id(txn, ev.id()) {
Err(_) => {
ids.insert(UnknownId::Id(*ev.id()));
if let Some(pk) = ev.pubkey() {
if ndb.get_profile_by_pubkey(txn, pk).is_err() {
ids.insert(UnknownId::Pubkey(*pk));
}
}
}
Ok(note) => {
if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() {
ids.insert(UnknownId::Pubkey(*note.pubkey()));
}
}
},
Mention::Note(note) => match ndb.get_note_by_id(txn, note.id()) {
Err(_) => {
ids.insert(UnknownId::Id(*note.id()));
}
Ok(note) => {
if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() {
ids.insert(UnknownId::Pubkey(*note.pubkey()));
}
}
},
_ => {}
}
}
Ok(())
}
#[cfg(feature = "profiling")]
fn setup_profiling() {
puffin::set_scopes_on(true); // tell puffin to collect data
@@ -529,6 +421,10 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) {
setup_profiling();
damus.state = DamusState::Initialized;
// this lets our eose handler know to close unknownids right away
damus
.subscriptions()
.insert("unknownids".to_string(), SubKind::OneShot);
setup_initial_nostrdb_subs(damus).expect("home subscription failed");
}
@@ -547,89 +443,25 @@ fn process_event(damus: &mut Damus, _subid: &str, event: &str) {
}
}
fn get_unknown_ids(txn: &Transaction, damus: &mut Damus) -> Result<Vec<UnknownId>> {
#[cfg(feature = "profiling")]
puffin::profile_function!();
let mut ids: HashSet<UnknownId> = HashSet::new();
let mut new_cached_notes: Vec<(NoteKey, CachedNote)> = vec![];
for timeline in &damus.timelines {
for noteref in timeline.notes(ViewFilter::NotesAndReplies) {
let note = damus.ndb.get_note_by_key(txn, noteref.key)?;
let note_key = note.key().unwrap();
let cached_note = damus.note_cache().cached_note(noteref.key);
let cached_note = if let Some(cn) = cached_note {
cn.clone()
} else {
let new_cached_note = CachedNote::new(&note);
new_cached_notes.push((note_key, new_cached_note.clone()));
new_cached_note
};
let _ = get_unknown_note_ids(
&damus.ndb,
&cached_note,
txn,
&note,
note.key().unwrap(),
&mut ids,
);
}
}
// This is mainly done to avoid the double mutable borrow that would happen
// if we tried to update the note_cache mutably in the loop above
for (note_key, note) in new_cached_notes {
damus.note_cache_mut().cache_mut().insert(note_key, note);
}
Ok(ids.into_iter().collect())
}
fn get_unknown_ids_filter(ids: &[UnknownId]) -> Option<Vec<Filter>> {
if ids.is_empty() {
return None;
}
let ids = &ids[0..500.min(ids.len())];
let mut filters: Vec<Filter> = vec![];
let pks: Vec<&[u8; 32]> = ids.iter().flat_map(|id| id.is_pubkey()).collect();
if !pks.is_empty() {
let pk_filter = Filter::new().authors(pks).kinds([0]).build();
filters.push(pk_filter);
}
let note_ids: Vec<&[u8; 32]> = ids.iter().flat_map(|id| id.is_id()).collect();
if !note_ids.is_empty() {
filters.push(Filter::new().ids(note_ids).build());
}
Some(filters)
}
fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> {
let sub_kind = if let Some(sub_kind) = damus.subscriptions().get(subid) {
sub_kind
} else {
warn!("got unknown eose subid {}", subid);
let n_subids = damus.subscriptions().len();
warn!(
"got unknown eose subid {}, {} tracked subscriptions",
subid, n_subids
);
return Ok(());
};
match *sub_kind {
SubKind::Initial => {
let txn = Transaction::new(&damus.ndb)?;
let ids = get_unknown_ids(&txn, damus)?;
if let Some(filters) = get_unknown_ids_filter(&ids) {
info!("Getting {} unknown ids from {}", ids.len(), relay_url);
let sub_id = Uuid::new_v4().to_string();
let msg = ClientMessage::req(sub_id.clone(), filters);
// unknownids are a oneshot request
damus.subscriptions().insert(sub_id, SubKind::OneShot);
damus.pool.send_to(&msg, relay_url);
UnknownIds::update(&txn, damus);
// this is possible if this is the first time
if damus.unknown_ids.ready_to_send() {
unknown_id_send(damus);
}
}
@@ -807,6 +639,7 @@ impl Damus {
Self {
pool,
is_mobile,
unknown_ids: UnknownIds::default(),
subscriptions: Subscriptions::default(),
since_optimize: parsed_args.since_optimize,
threads: Threads::default(),
@@ -841,6 +674,7 @@ impl Damus {
config.set_ingester_threads(2);
Self {
is_mobile,
unknown_ids: UnknownIds::default(),
subscriptions: Subscriptions::default(),
since_optimize: true,
threads: Threads::default(),