Merge initial threads

This commit is contained in:
William Casarin
2024-08-16 11:51:42 -07:00
18 changed files with 1264 additions and 480 deletions

View File

@@ -1,8 +1,8 @@
use crate::account_manager::AccountManager;
use crate::actionbar::BarResult;
use crate::app_creation::setup_cc;
use crate::app_style::user_requested_visuals_change;
use crate::draft::Drafts;
use crate::error::Error;
use crate::frame_history::FrameHistory;
use crate::imgcache::ImageCache;
use crate::key_storage::KeyStorageType;
@@ -10,8 +10,8 @@ use crate::note::NoteRef;
use crate::notecache::{CachedNote, NoteCache};
use crate::relay_pool_manager::RelayPoolManager;
use crate::route::Route;
use crate::timeline;
use crate::timeline::{MergeKind, Timeline, ViewFilter};
use crate::thread::{DecrementResult, Threads};
use crate::timeline::{Timeline, TimelineSource, ViewFilter};
use crate::ui::note::PostAction;
use crate::ui::{self, AccountSelectionWidget, DesktopGlobalPopup};
use crate::ui::{DesktopSidePanel, RelayView, View};
@@ -53,10 +53,11 @@ pub struct Damus {
pub timelines: Vec<Timeline>,
pub selected_timeline: i32,
pub drafts: Drafts,
pub img_cache: ImageCache,
pub ndb: Ndb,
pub drafts: Drafts,
pub threads: Threads,
pub img_cache: ImageCache,
pub account_manager: AccountManager,
frame_history: crate::frame_history::FrameHistory,
@@ -93,27 +94,6 @@ fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) {
/// notes locally. One way to determine this is by looking at the current filter
/// and seeing what its limit is. If we have less notes than the limit,
/// we might want to backfill older notes
fn should_since_optimize(limit: Option<u16>, num_notes: usize) -> bool {
let limit = limit.unwrap_or(enostr::Filter::default_limit()) as usize;
// rough heuristic for bailing since optimization if we don't have enough notes
limit <= num_notes
}
fn since_optimize_filter(filter: &mut enostr::Filter, notes: &[NoteRef]) {
// Get the latest entry in the events
if notes.is_empty() {
return;
}
// get the latest note
let latest = notes[0];
let since = latest.created_at - 60;
// update the filters
filter.since = Some(since);
}
fn send_initial_filters(damus: &mut Damus, relay_url: &str) {
info!("Sending initial filters to {}", relay_url);
let mut c: u32 = 1;
@@ -132,8 +112,8 @@ fn send_initial_filters(damus: &mut Damus, relay_url: &str) {
}
let notes = timeline.notes(ViewFilter::NotesAndReplies);
if should_since_optimize(f.limit, notes.len()) {
since_optimize_filter(f, notes);
if crate::filter::should_since_optimize(f.limit, notes.len()) {
crate::filter::since_optimize_filter(f, notes);
} else {
warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", f);
}
@@ -229,7 +209,8 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
let txn = Transaction::new(&damus.ndb)?;
let mut unknown_ids: HashSet<UnknownId> = HashSet::new();
for timeline in 0..damus.timelines.len() {
if let Err(err) = poll_notes_for_timeline(damus, &txn, timeline, &mut unknown_ids) {
let src = TimelineSource::column(timeline);
if let Err(err) = src.poll_notes_into_view(damus, &txn, &mut unknown_ids) {
error!("{}", err);
}
}
@@ -248,7 +229,7 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
}
#[derive(Hash, Clone, Copy, PartialEq, Eq)]
enum UnknownId<'a> {
pub enum UnknownId<'a> {
Pubkey(&'a [u8; 32]),
Id(&'a [u8; 32]),
}
@@ -277,9 +258,9 @@ impl<'a> UnknownId<'a> {
/// We return all of this in a HashSet so that we can fetch these from
/// remote relays.
///
fn get_unknown_note_ids<'a>(
pub fn get_unknown_note_ids<'a>(
ndb: &Ndb,
_cached_note: &CachedNote,
cached_note: &CachedNote,
txn: &'a Transaction,
note: &Note<'a>,
note_key: NoteKey,
@@ -292,7 +273,6 @@ fn get_unknown_note_ids<'a>(
}
// pull notes that notes are replying to
/* TODO: FIX tags lifetime
if cached_note.reply.root.is_some() {
let note_reply = cached_note.reply.borrow(note.tags());
if let Some(root) = note_reply.root() {
@@ -309,7 +289,6 @@ fn get_unknown_note_ids<'a>(
}
}
}
*/
let blocks = ndb.get_blocks_by_key(txn, note_key)?;
for block in blocks.iter(note) {
@@ -360,101 +339,6 @@ fn get_unknown_note_ids<'a>(
Ok(())
}
fn poll_notes_for_timeline<'a>(
damus: &mut Damus,
txn: &'a Transaction,
timeline_ind: usize,
ids: &mut HashSet<UnknownId<'a>>,
) -> Result<()> {
let sub = if let Some(sub) = &damus.timelines[timeline_ind].subscription {
sub
} else {
return Err(Error::NoActiveSubscription);
};
let new_note_ids = damus.ndb.poll_for_notes(sub.id, 100);
if new_note_ids.is_empty() {
return Ok(());
} else {
debug!("{} new notes! {:?}", new_note_ids.len(), new_note_ids);
}
let mut new_refs: Vec<(Note, NoteRef)> = Vec::with_capacity(new_note_ids.len());
for key in new_note_ids {
let note = if let Ok(note) = damus.ndb.get_note_by_key(txn, key) {
note
} else {
error!("hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", key);
continue;
};
let cached_note = damus
.note_cache_mut()
.cached_note_or_insert(key, &note)
.clone();
let _ = get_unknown_note_ids(&damus.ndb, &cached_note, txn, &note, key, ids);
let created_at = note.created_at();
new_refs.push((note, NoteRef { key, created_at }));
}
// ViewFilter::NotesAndReplies
{
let refs: Vec<NoteRef> = new_refs.iter().map(|(_note, nr)| *nr).collect();
insert_notes_into_timeline(damus, timeline_ind, ViewFilter::NotesAndReplies, &refs)
}
//
// handle the filtered case (ViewFilter::Notes, no replies)
//
// TODO(jb55): this is mostly just copied from above, let's just use a loop
// I initially tried this but ran into borrow checker issues
{
let mut filtered_refs = Vec::with_capacity(new_refs.len());
for (note, nr) in &new_refs {
let cached_note = damus.note_cache_mut().cached_note_or_insert(nr.key, note);
if ViewFilter::filter_notes(cached_note, note) {
filtered_refs.push(*nr);
}
}
insert_notes_into_timeline(damus, timeline_ind, ViewFilter::Notes, &filtered_refs);
}
Ok(())
}
fn insert_notes_into_timeline(
app: &mut Damus,
timeline_ind: usize,
filter: ViewFilter,
new_refs: &[NoteRef],
) {
let timeline = &mut app.timelines[timeline_ind];
let num_prev_items = timeline.notes(filter).len();
let (notes, merge_kind) = timeline::merge_sorted_vecs(timeline.notes(filter), new_refs);
debug!(
"got merge kind {:?} for {:?} on timeline {}",
merge_kind, filter, timeline_ind
);
timeline.view_mut(filter).notes = notes;
let new_items = timeline.notes(filter).len() - num_prev_items;
// TODO: technically items could have been added inbetween
if new_items > 0 {
let mut list = app.timelines[timeline_ind].view(filter).list.borrow_mut();
match merge_kind {
// TODO: update egui_virtual_list to support spliced inserts
MergeKind::Spliced => list.reset(),
MergeKind::FrontInsert => list.items_inserted_at_start(new_items),
}
}
}
#[cfg(feature = "profiling")]
fn setup_profiling() {
puffin::set_scopes_on(true); // tell puffin to collect data
@@ -762,6 +646,7 @@ fn parse_args(args: &[String]) -> Args {
res
}
/*
fn determine_key_storage_type() -> KeyStorageType {
#[cfg(target_os = "macos")]
{
@@ -778,6 +663,7 @@ fn determine_key_storage_type() -> KeyStorageType {
KeyStorageType::None
}
}
*/
impl Damus {
/// Called once before the first frame.
@@ -808,7 +694,7 @@ impl Damus {
// TODO: should pull this from settings
None,
// TODO: use correct KeyStorage mechanism for current OS arch
determine_key_storage_type(),
KeyStorageType::None,
);
for key in parsed_args.keys {
@@ -843,6 +729,7 @@ impl Damus {
Self {
pool,
is_mobile,
threads: Threads::default(),
drafts: Drafts::default(),
state: DamusState::Initializing,
img_cache: ImageCache::new(imgcache_dir),
@@ -872,6 +759,7 @@ impl Damus {
config.set_ingester_threads(2);
Self {
is_mobile,
threads: Threads::default(),
drafts: Drafts::default(),
state: DamusState::Initializing,
pool: RelayPool::new(),
@@ -1015,6 +903,53 @@ fn render_panel(ctx: &egui::Context, app: &mut Damus, timeline_ind: usize) {
});
}
/// Local thread unsubscribe
fn thread_unsubscribe(app: &mut Damus, id: &[u8; 32]) {
let unsubscribe = {
let txn = Transaction::new(&app.ndb).expect("txn");
let root_id = crate::note::root_note_id_from_selected_id(app, &txn, id);
let thread = app.threads.thread_mut(&app.ndb, &txn, root_id).get_ptr();
let unsub = thread.decrement_sub();
if let Ok(DecrementResult::LastSubscriber(_subid)) = unsub {
*thread.subscription_mut() = None;
}
unsub
};
match unsubscribe {
Ok(DecrementResult::LastSubscriber(sub_id)) => {
if let Err(e) = app.ndb.unsubscribe(sub_id) {
error!("failed to unsubscribe from thread: {e}, subid:{sub_id}, {} active subscriptions", app.ndb.subscription_count());
} else {
info!(
"Unsubscribed from thread subid:{}. {} active subscriptions",
sub_id,
app.ndb.subscription_count()
);
}
}
Ok(DecrementResult::ActiveSubscribers) => {
info!(
"Keeping thread subscription. {} active subscriptions.",
app.ndb.subscription_count()
);
// do nothing
}
Err(e) => {
// something is wrong!
error!(
"Thread unsubscribe error: {e}. {} active subsciptions.",
app.ndb.subscription_count()
);
}
}
}
fn render_nav(routes: Vec<Route>, timeline_ind: usize, app: &mut Damus, ui: &mut egui::Ui) {
let navigating = app.timelines[timeline_ind].navigating;
let returning = app.timelines[timeline_ind].returning;
@@ -1027,7 +962,7 @@ fn render_nav(routes: Vec<Route>, timeline_ind: usize, app: &mut Damus, ui: &mut
.show(ui, |ui, nav| match nav.top() {
Route::Timeline(_n) => {
let app = &mut app_ctx.borrow_mut();
timeline::timeline_view(ui, app, timeline_ind);
ui::TimelineView::new(app, timeline_ind).ui(ui);
None
}
@@ -1036,11 +971,6 @@ fn render_nav(routes: Vec<Route>, timeline_ind: usize, app: &mut Damus, ui: &mut
None
}
Route::Thread(_key) => {
ui.label("thread view");
None
}
Route::Relays => {
let pool = &mut app_ctx.borrow_mut().pool;
let manager = RelayPoolManager::new(pool);
@@ -1048,6 +978,22 @@ fn render_nav(routes: Vec<Route>, timeline_ind: usize, app: &mut Damus, ui: &mut
None
}
Route::Thread(id) => {
let app = &mut app_ctx.borrow_mut();
let result = ui::ThreadView::new(app, timeline_ind, id.bytes()).ui(ui);
if let Some(bar_result) = result {
match bar_result {
BarResult::NewThreadNotes(new_notes) => {
let thread = app.threads.thread_expected_mut(new_notes.root_id.bytes());
new_notes.process(thread);
}
}
}
None
}
Route::Reply(id) => {
let mut app = app_ctx.borrow_mut();
@@ -1076,18 +1022,21 @@ fn render_nav(routes: Vec<Route>, timeline_ind: usize, app: &mut Damus, ui: &mut
}
});
let mut app = app_ctx.borrow_mut();
if let Some(reply_response) = nav_response.inner {
if let Some(PostAction::Post(_np)) = reply_response.inner.action {
app_ctx.borrow_mut().timelines[timeline_ind].returning = true;
app.timelines[timeline_ind].returning = true;
}
}
if let Some(NavAction::Returned) = nav_response.action {
let mut app = app_ctx.borrow_mut();
app.timelines[timeline_ind].routes.pop();
let popped = app.timelines[timeline_ind].routes.pop();
if let Some(Route::Thread(id)) = popped {
thread_unsubscribe(&mut app, id.bytes());
}
app.timelines[timeline_ind].returning = false;
} else if let Some(NavAction::Navigated) = nav_response.action {
app_ctx.borrow_mut().timelines[timeline_ind].navigating = false;
app.timelines[timeline_ind].navigating = false;
}
}