fetch contact lists
If we don't have a contact list, make sure to fetch one Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
448
src/app.rs
448
src/app.rs
@@ -5,6 +5,8 @@ use crate::app_style::user_requested_visuals_change;
|
||||
use crate::args::Args;
|
||||
use crate::column::ColumnKind;
|
||||
use crate::draft::Drafts;
|
||||
use crate::error::{Error, FilterError};
|
||||
use crate::filter::FilterState;
|
||||
use crate::frame_history::FrameHistory;
|
||||
use crate::imgcache::ImageCache;
|
||||
use crate::key_storage::KeyStorageType;
|
||||
@@ -12,6 +14,7 @@ use crate::note::NoteRef;
|
||||
use crate::notecache::{CachedNote, NoteCache};
|
||||
use crate::relay_pool_manager::RelayPoolManager;
|
||||
use crate::route::Route;
|
||||
use crate::subscriptions::{SubKind, Subscriptions};
|
||||
use crate::thread::{DecrementResult, Threads};
|
||||
use crate::timeline::{Timeline, TimelineSource, ViewFilter};
|
||||
use crate::ui::note::PostAction;
|
||||
@@ -22,13 +25,14 @@ use egui_nav::{Nav, NavAction};
|
||||
use enostr::{ClientMessage, RelayEvent, RelayMessage, RelayPool};
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
use uuid::Uuid;
|
||||
|
||||
use egui::{Context, Frame, Style};
|
||||
use egui_extras::{Size, StripBuilder};
|
||||
|
||||
use nostrdb::{BlockType, Config, Filter, Mention, Ndb, Note, NoteKey, Transaction};
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::hash::Hash;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
@@ -57,6 +61,7 @@ pub struct Damus {
|
||||
pub threads: Threads,
|
||||
pub img_cache: ImageCache,
|
||||
pub account_manager: AccountManager,
|
||||
pub subscriptions: Subscriptions,
|
||||
|
||||
frame_history: crate::frame_history::FrameHistory,
|
||||
|
||||
@@ -93,54 +98,87 @@ fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) {
|
||||
}
|
||||
}
|
||||
|
||||
fn send_initial_filters(damus: &mut Damus, relay_url: &str) {
|
||||
info!("Sending initial filters to {}", relay_url);
|
||||
let mut c: u32 = 1;
|
||||
fn send_initial_timeline_filter(damus: &mut Damus, timeline: usize) {
|
||||
let can_since_optimize = damus.since_optimize;
|
||||
|
||||
for relay in &mut damus.pool.relays {
|
||||
let relay = &mut relay.relay;
|
||||
if relay.url == relay_url {
|
||||
for timeline in &damus.timelines {
|
||||
let filter = if let Some(filter) = &timeline.filter {
|
||||
filter.clone()
|
||||
} else {
|
||||
// TODO: handle unloaded filters
|
||||
error!("TODO: handle unloaded filters");
|
||||
continue;
|
||||
};
|
||||
let filter_state = damus.timelines[timeline].filter.clone();
|
||||
|
||||
let new_filters = filter.into_iter().map(|f| {
|
||||
// limit the size of remote filters
|
||||
let default_limit = filter::default_remote_limit();
|
||||
let mut lim = f.limit().unwrap_or(default_limit);
|
||||
let mut filter = f;
|
||||
if lim > default_limit {
|
||||
lim = default_limit;
|
||||
filter = filter.limit_mut(lim);
|
||||
}
|
||||
|
||||
let notes = timeline.notes(ViewFilter::NotesAndReplies);
|
||||
|
||||
// Should we since optimize? Not always. For example
|
||||
// if we only have a few notes locally. One way to
|
||||
// determine this is by looking at the current filter
|
||||
// and seeing what its limit is. If we have less
|
||||
// notes than the limit, we might want to backfill
|
||||
// older notes
|
||||
if can_since_optimize && filter::should_since_optimize(lim, notes.len()) {
|
||||
filter = filter::since_optimize_filter(filter, notes);
|
||||
} else {
|
||||
warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter);
|
||||
}
|
||||
|
||||
filter
|
||||
}).collect();
|
||||
relay.subscribe(format!("initial{}", c), new_filters);
|
||||
c += 1;
|
||||
}
|
||||
return;
|
||||
match filter_state {
|
||||
FilterState::Broken(err) => {
|
||||
error!(
|
||||
"FetchingRemote state in broken state when sending initial timeline filter? {err}"
|
||||
);
|
||||
}
|
||||
|
||||
FilterState::FetchingRemote(_unisub) => {
|
||||
error!("FetchingRemote state when sending initial timeline filter?");
|
||||
}
|
||||
|
||||
FilterState::GotRemote(_sub) => {
|
||||
error!("GotRemote state when sending initial timeline filter?");
|
||||
}
|
||||
|
||||
FilterState::Ready(filter) => {
|
||||
let filter = filter.to_owned();
|
||||
let new_filters = filter.into_iter().map(|f| {
|
||||
// limit the size of remote filters
|
||||
let default_limit = filter::default_remote_limit();
|
||||
let mut lim = f.limit().unwrap_or(default_limit);
|
||||
let mut filter = f;
|
||||
if lim > default_limit {
|
||||
lim = default_limit;
|
||||
filter = filter.limit_mut(lim);
|
||||
}
|
||||
|
||||
let notes = damus.timelines[timeline].notes(ViewFilter::NotesAndReplies);
|
||||
|
||||
// Should we since optimize? Not always. For example
|
||||
// if we only have a few notes locally. One way to
|
||||
// determine this is by looking at the current filter
|
||||
// and seeing what its limit is. If we have less
|
||||
// notes than the limit, we might want to backfill
|
||||
// older notes
|
||||
if can_since_optimize && filter::should_since_optimize(lim, notes.len()) {
|
||||
filter = filter::since_optimize_filter(filter, notes);
|
||||
} else {
|
||||
warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter);
|
||||
}
|
||||
|
||||
filter
|
||||
}).collect();
|
||||
|
||||
let sub_id = Uuid::new_v4().to_string();
|
||||
damus
|
||||
.subscriptions()
|
||||
.insert(sub_id.clone(), SubKind::Initial);
|
||||
|
||||
damus.pool.subscribe(sub_id, new_filters);
|
||||
}
|
||||
|
||||
// we need some data first
|
||||
FilterState::NeedsRemote(filter) => {
|
||||
let sub_id = Uuid::new_v4().to_string();
|
||||
let uid = damus.timelines[timeline].uid;
|
||||
let local_sub = damus.ndb.subscribe(&filter).expect("sub");
|
||||
|
||||
damus.timelines[timeline].filter =
|
||||
FilterState::fetching_remote(sub_id.clone(), local_sub);
|
||||
|
||||
damus
|
||||
.subscriptions()
|
||||
.insert(sub_id.clone(), SubKind::FetchingContactList(uid));
|
||||
|
||||
damus.pool.subscribe(sub_id, filter.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn send_initial_filters(damus: &mut Damus, relay_url: &str) {
|
||||
info!("Sending initial filters to {}", relay_url);
|
||||
let timelines = damus.timelines.len();
|
||||
|
||||
for i in 0..timelines {
|
||||
send_initial_timeline_filter(damus, i);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,15 +262,20 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
let txn = Transaction::new(&damus.ndb)?;
|
||||
let mut unknown_ids: HashSet<UnknownId> = HashSet::new();
|
||||
for timeline in 0..damus.timelines.len() {
|
||||
let src = TimelineSource::column(timeline);
|
||||
if let Err(err) = src.poll_notes_into_view(damus, &txn, &mut unknown_ids) {
|
||||
error!("{}", err);
|
||||
|
||||
if let Ok(true) = is_timeline_ready(damus, timeline) {
|
||||
if let Err(err) = src.poll_notes_into_view(damus, &mut unknown_ids) {
|
||||
error!("poll_notes_into_view: {err}");
|
||||
}
|
||||
} else {
|
||||
// TODO: show loading?
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
let unknown_ids: Vec<UnknownId> = unknown_ids.into_iter().collect();
|
||||
if let Some(filters) = get_unknown_ids_filter(&unknown_ids) {
|
||||
info!(
|
||||
@@ -242,25 +285,81 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) -> Result<()> {
|
||||
let msg = ClientMessage::req("unknown_ids".to_string(), filters);
|
||||
damus.pool.send(&msg);
|
||||
}
|
||||
*/
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Hash, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum UnknownId<'a> {
|
||||
Pubkey(&'a [u8; 32]),
|
||||
Id(&'a [u8; 32]),
|
||||
/// Check our timeline filter and see if we have any filter data ready.
|
||||
/// Our timelines may require additional data before it is functional. For
|
||||
/// example, when we have to fetch a contact list before we do the actual
|
||||
/// following list query.
|
||||
fn is_timeline_ready(damus: &mut Damus, timeline: usize) -> Result<bool> {
|
||||
let sub = match &damus.timelines[timeline].filter {
|
||||
FilterState::GotRemote(sub) => *sub,
|
||||
FilterState::Ready(_f) => return Ok(true),
|
||||
_ => return Ok(false),
|
||||
};
|
||||
|
||||
// We got at least one eose for our filter request. Let's see
|
||||
// if nostrdb is done processing it yet.
|
||||
let res = damus.ndb.poll_for_notes(sub, 1);
|
||||
if res.is_empty() {
|
||||
debug!("check_timeline_filter_state: no notes found (yet?) for timeline {timeline}");
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
info!("notes found for contact timeline after GotRemote!");
|
||||
|
||||
let note_key = res[0];
|
||||
|
||||
let filter = {
|
||||
let txn = Transaction::new(&damus.ndb).expect("txn");
|
||||
let note = damus.ndb.get_note_by_key(&txn, note_key).expect("note");
|
||||
filter::filter_from_tags(¬e).map(|f| f.into_follow_filter())
|
||||
};
|
||||
|
||||
// TODO: into_follow_filter is hardcoded to contact lists, let's generalize
|
||||
match filter {
|
||||
Err(Error::Filter(e)) => {
|
||||
error!("got broken when building filter {e}");
|
||||
damus.timelines[timeline].filter = FilterState::broken(e);
|
||||
}
|
||||
Err(err) => {
|
||||
error!("got broken when building filter {err}");
|
||||
damus.timelines[timeline].filter = FilterState::broken(FilterError::EmptyContactList);
|
||||
return Err(err);
|
||||
}
|
||||
Ok(filter) => {
|
||||
// we just switched to the ready state, we should send initial
|
||||
// queries and setup the local subscription
|
||||
info!("Found contact list! Setting up local and remote contact list query");
|
||||
setup_initial_timeline(damus, timeline, &filter).expect("setup init");
|
||||
damus.timelines[timeline].filter = FilterState::ready(filter.clone());
|
||||
|
||||
let subid = Uuid::new_v4().to_string();
|
||||
damus.pool.subscribe(subid, filter)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
impl<'a> UnknownId<'a> {
|
||||
pub fn is_pubkey(&self) -> Option<&'a [u8; 32]> {
|
||||
#[derive(Hash, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum UnknownId {
|
||||
Pubkey([u8; 32]),
|
||||
Id([u8; 32]),
|
||||
}
|
||||
|
||||
impl UnknownId {
|
||||
pub fn is_pubkey(&self) -> Option<&[u8; 32]> {
|
||||
match self {
|
||||
UnknownId::Pubkey(pk) => Some(pk),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_id(&self) -> Option<&'a [u8; 32]> {
|
||||
pub fn is_id(&self) -> Option<&[u8; 32]> {
|
||||
match self {
|
||||
UnknownId::Id(id) => Some(id),
|
||||
_ => None,
|
||||
@@ -282,12 +381,12 @@ pub fn get_unknown_note_ids<'a>(
|
||||
txn: &'a Transaction,
|
||||
note: &Note<'a>,
|
||||
note_key: NoteKey,
|
||||
ids: &mut HashSet<UnknownId<'a>>,
|
||||
ids: &mut HashSet<UnknownId>,
|
||||
) -> Result<()> {
|
||||
// the author pubkey
|
||||
|
||||
if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() {
|
||||
ids.insert(UnknownId::Pubkey(note.pubkey()));
|
||||
ids.insert(UnknownId::Pubkey(*note.pubkey()));
|
||||
}
|
||||
|
||||
// pull notes that notes are replying to
|
||||
@@ -295,14 +394,14 @@ pub fn get_unknown_note_ids<'a>(
|
||||
let note_reply = cached_note.reply.borrow(note.tags());
|
||||
if let Some(root) = note_reply.root() {
|
||||
if ndb.get_note_by_id(txn, root.id).is_err() {
|
||||
ids.insert(UnknownId::Id(root.id));
|
||||
ids.insert(UnknownId::Id(*root.id));
|
||||
}
|
||||
}
|
||||
|
||||
if !note_reply.is_reply_to_root() {
|
||||
if let Some(reply) = note_reply.reply() {
|
||||
if ndb.get_note_by_id(txn, reply.id).is_err() {
|
||||
ids.insert(UnknownId::Id(reply.id));
|
||||
ids.insert(UnknownId::Id(*reply.id));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -317,36 +416,36 @@ pub fn get_unknown_note_ids<'a>(
|
||||
match block.as_mention().unwrap() {
|
||||
Mention::Pubkey(npub) => {
|
||||
if ndb.get_profile_by_pubkey(txn, npub.pubkey()).is_err() {
|
||||
ids.insert(UnknownId::Pubkey(npub.pubkey()));
|
||||
ids.insert(UnknownId::Pubkey(*npub.pubkey()));
|
||||
}
|
||||
}
|
||||
Mention::Profile(nprofile) => {
|
||||
if ndb.get_profile_by_pubkey(txn, nprofile.pubkey()).is_err() {
|
||||
ids.insert(UnknownId::Pubkey(nprofile.pubkey()));
|
||||
ids.insert(UnknownId::Pubkey(*nprofile.pubkey()));
|
||||
}
|
||||
}
|
||||
Mention::Event(ev) => match ndb.get_note_by_id(txn, ev.id()) {
|
||||
Err(_) => {
|
||||
ids.insert(UnknownId::Id(ev.id()));
|
||||
ids.insert(UnknownId::Id(*ev.id()));
|
||||
if let Some(pk) = ev.pubkey() {
|
||||
if ndb.get_profile_by_pubkey(txn, pk).is_err() {
|
||||
ids.insert(UnknownId::Pubkey(pk));
|
||||
ids.insert(UnknownId::Pubkey(*pk));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(note) => {
|
||||
if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() {
|
||||
ids.insert(UnknownId::Pubkey(note.pubkey()));
|
||||
ids.insert(UnknownId::Pubkey(*note.pubkey()));
|
||||
}
|
||||
}
|
||||
},
|
||||
Mention::Note(note) => match ndb.get_note_by_id(txn, note.id()) {
|
||||
Err(_) => {
|
||||
ids.insert(UnknownId::Id(note.id()));
|
||||
ids.insert(UnknownId::Id(*note.id()));
|
||||
}
|
||||
Ok(note) => {
|
||||
if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() {
|
||||
ids.insert(UnknownId::Pubkey(note.pubkey()));
|
||||
ids.insert(UnknownId::Pubkey(*note.pubkey()));
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -362,48 +461,61 @@ fn setup_profiling() {
|
||||
puffin::set_scopes_on(true); // tell puffin to collect data
|
||||
}
|
||||
|
||||
fn setup_initial_timeline(damus: &mut Damus, timeline: usize, filters: &[Filter]) -> Result<()> {
|
||||
damus.timelines[timeline].subscription = Some(damus.ndb.subscribe(filters)?);
|
||||
let txn = Transaction::new(&damus.ndb)?;
|
||||
debug!(
|
||||
"querying nostrdb sub {:?} {:?}",
|
||||
damus.timelines[timeline].subscription, damus.timelines[timeline].filter
|
||||
);
|
||||
let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32;
|
||||
let results = damus.ndb.query(&txn, filters, lim)?;
|
||||
|
||||
let filters = {
|
||||
let views = &damus.timelines[timeline].views;
|
||||
let filters: Vec<fn(&CachedNote, &Note) -> bool> =
|
||||
views.iter().map(|v| v.filter.filter()).collect();
|
||||
filters
|
||||
};
|
||||
|
||||
for result in results {
|
||||
for (view, filter) in filters.iter().enumerate() {
|
||||
if filter(
|
||||
damus
|
||||
.note_cache_mut()
|
||||
.cached_note_or_insert_mut(result.note_key, &result.note),
|
||||
&result.note,
|
||||
) {
|
||||
damus.timelines[timeline].views[view].notes.push(NoteRef {
|
||||
key: result.note_key,
|
||||
created_at: result.note.created_at(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn setup_initial_nostrdb_subs(damus: &mut Damus) -> Result<()> {
|
||||
let timelines = damus.timelines.len();
|
||||
for i in 0..timelines {
|
||||
let filters = if let Some(filters) = &damus.timelines[i].filter {
|
||||
filters.clone()
|
||||
} else {
|
||||
// TODO: for unloaded filters, we will need to fetch things like
|
||||
// the contact and relay list from remote relays.
|
||||
error!("TODO: handle unloaded filters");
|
||||
continue;
|
||||
};
|
||||
let filter = damus.timelines[i].filter.clone();
|
||||
match filter {
|
||||
FilterState::Ready(filters) => setup_initial_timeline(damus, i, &filters)?,
|
||||
|
||||
damus.timelines[i].subscription = Some(damus.ndb.subscribe(filters.clone())?);
|
||||
let txn = Transaction::new(&damus.ndb)?;
|
||||
debug!(
|
||||
"querying nostrdb sub {} {:?}",
|
||||
damus.timelines[i].subscription.as_ref().unwrap().id,
|
||||
damus.timelines[i].filter
|
||||
);
|
||||
let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32;
|
||||
let results = damus.ndb.query(&txn, filters.clone(), lim)?;
|
||||
|
||||
let filters = {
|
||||
let views = &damus.timelines[i].views;
|
||||
let filters: Vec<fn(&CachedNote, &Note) -> bool> =
|
||||
views.iter().map(|v| v.filter.filter()).collect();
|
||||
filters
|
||||
};
|
||||
|
||||
for result in results {
|
||||
for (j, filter) in filters.iter().enumerate() {
|
||||
if filter(
|
||||
damus
|
||||
.note_cache_mut()
|
||||
.cached_note_or_insert_mut(result.note_key, &result.note),
|
||||
&result.note,
|
||||
) {
|
||||
damus.timelines[i].views[j].notes.push(NoteRef {
|
||||
key: result.note_key,
|
||||
created_at: result.note.created_at(),
|
||||
})
|
||||
}
|
||||
FilterState::Broken(err) => {
|
||||
error!("FetchingRemote state broken in setup_initial_nostr_subs: {err}")
|
||||
}
|
||||
FilterState::FetchingRemote(_) => {
|
||||
error!("FetchingRemote state in setup_initial_nostr_subs")
|
||||
}
|
||||
FilterState::GotRemote(_) => {
|
||||
error!("GotRemote state in setup_initial_nostr_subs")
|
||||
}
|
||||
FilterState::NeedsRemote(_filters) => {
|
||||
// can't do anything yet, we defer to first connect to send
|
||||
// remote filters
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -435,7 +547,7 @@ fn process_event(damus: &mut Damus, _subid: &str, event: &str) {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_unknown_ids<'a>(txn: &'a Transaction, damus: &mut Damus) -> Result<Vec<UnknownId<'a>>> {
|
||||
fn get_unknown_ids(txn: &Transaction, damus: &mut Damus) -> Result<Vec<UnknownId>> {
|
||||
#[cfg(feature = "profiling")]
|
||||
puffin::profile_function!();
|
||||
|
||||
@@ -475,11 +587,12 @@ fn get_unknown_ids<'a>(txn: &'a Transaction, damus: &mut Damus) -> Result<Vec<Un
|
||||
Ok(ids.into_iter().collect())
|
||||
}
|
||||
|
||||
fn get_unknown_ids_filter(ids: &[UnknownId<'_>]) -> Option<Vec<Filter>> {
|
||||
fn get_unknown_ids_filter(ids: &[UnknownId]) -> Option<Vec<Filter>> {
|
||||
if ids.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let ids = &ids[0..500.min(ids.len())];
|
||||
let mut filters: Vec<Filter> = vec![];
|
||||
|
||||
let pks: Vec<&[u8; 32]> = ids.iter().flat_map(|id| id.is_pubkey()).collect();
|
||||
@@ -498,19 +611,75 @@ fn get_unknown_ids_filter(ids: &[UnknownId<'_>]) -> Option<Vec<Filter>> {
|
||||
}
|
||||
|
||||
fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> {
|
||||
if subid.starts_with("initial") {
|
||||
let txn = Transaction::new(&damus.ndb)?;
|
||||
let ids = get_unknown_ids(&txn, damus)?;
|
||||
if let Some(filters) = get_unknown_ids_filter(&ids) {
|
||||
info!("Getting {} unknown ids from {}", ids.len(), relay_url);
|
||||
let msg = ClientMessage::req("unknown_ids".to_string(), filters);
|
||||
damus.pool.send_to(&msg, relay_url);
|
||||
}
|
||||
} else if subid == "unknown_ids" {
|
||||
let msg = ClientMessage::close("unknown_ids".to_string());
|
||||
damus.pool.send_to(&msg, relay_url);
|
||||
let sub_kind = if let Some(sub_kind) = damus.subscriptions().get(subid) {
|
||||
sub_kind
|
||||
} else {
|
||||
warn!("got unknown eose subid {}", subid);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
match *sub_kind {
|
||||
SubKind::Initial => {
|
||||
let txn = Transaction::new(&damus.ndb)?;
|
||||
let ids = get_unknown_ids(&txn, damus)?;
|
||||
if let Some(filters) = get_unknown_ids_filter(&ids) {
|
||||
info!("Getting {} unknown ids from {}", ids.len(), relay_url);
|
||||
let sub_id = Uuid::new_v4().to_string();
|
||||
|
||||
let msg = ClientMessage::req(sub_id.clone(), filters);
|
||||
// unknownids are a oneshot request
|
||||
damus.subscriptions().insert(sub_id, SubKind::OneShot);
|
||||
damus.pool.send_to(&msg, relay_url);
|
||||
}
|
||||
}
|
||||
|
||||
// oneshot subs just close when they're done
|
||||
SubKind::OneShot => {
|
||||
let msg = ClientMessage::close(subid.to_string());
|
||||
damus.pool.send_to(&msg, relay_url);
|
||||
}
|
||||
|
||||
SubKind::FetchingContactList(timeline_uid) => {
|
||||
let timeline_ind = if let Some(i) = damus.find_timeline(timeline_uid) {
|
||||
i
|
||||
} else {
|
||||
error!(
|
||||
"timeline uid:{} not found for FetchingContactList",
|
||||
timeline_uid
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let local_sub = if let FilterState::FetchingRemote(unisub) =
|
||||
&damus.timelines[timeline_ind].filter
|
||||
{
|
||||
unisub.local
|
||||
} else {
|
||||
// TODO: we could have multiple contact list results, we need
|
||||
// to check to see if this one is newer and use that instead
|
||||
warn!(
|
||||
"Expected timeline to have FetchingRemote state but was {:?}",
|
||||
damus.timelines[timeline_ind].filter
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
damus.timelines[timeline_ind].filter = FilterState::got_remote(local_sub);
|
||||
|
||||
/*
|
||||
// see if we're fast enough to catch a processed contact list
|
||||
let note_keys = damus.ndb.poll_for_notes(local_sub, 1);
|
||||
if !note_keys.is_empty() {
|
||||
debug!("fast! caught contact list from {relay_url} right away");
|
||||
let txn = Transaction::new(&damus.ndb)?;
|
||||
let note_key = note_keys[0];
|
||||
let nr = damus.ndb.get_note_by_key(&txn, note_key)?;
|
||||
let filter = filter::filter_from_tags(&nr)?.into_follow_filter();
|
||||
setup_initial_timeline(damus, timeline, &filter)
|
||||
damus.timelines[timeline_ind].filter = FilterState::ready(filter);
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -627,15 +796,18 @@ impl Damus {
|
||||
.as_ref()
|
||||
.map(|a| a.pubkey.bytes());
|
||||
let ndb = Ndb::new(&dbpath, &config).expect("ndb");
|
||||
let timelines = parsed_args
|
||||
.columns
|
||||
.into_iter()
|
||||
.map(|c| c.into_timeline(&ndb, account))
|
||||
.collect();
|
||||
|
||||
let mut timelines: Vec<Timeline> = Vec::with_capacity(parsed_args.columns.len());
|
||||
for col in parsed_args.columns {
|
||||
if let Some(timeline) = col.into_timeline(&ndb, account) {
|
||||
timelines.push(timeline);
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
pool,
|
||||
is_mobile,
|
||||
subscriptions: Subscriptions::default(),
|
||||
since_optimize: parsed_args.since_optimize,
|
||||
threads: Threads::default(),
|
||||
drafts: Drafts::default(),
|
||||
@@ -657,7 +829,10 @@ impl Damus {
|
||||
pub fn mock<P: AsRef<Path>>(data_path: P, is_mobile: bool) -> Self {
|
||||
let mut timelines: Vec<Timeline> = vec![];
|
||||
let filter = Filter::from_json(include_str!("../queries/global.json")).unwrap();
|
||||
timelines.push(Timeline::new(ColumnKind::Universe, Some(vec![filter])));
|
||||
timelines.push(Timeline::new(
|
||||
ColumnKind::Universe,
|
||||
FilterState::ready(vec![filter]),
|
||||
));
|
||||
|
||||
let imgcache_dir = data_path.as_ref().join(ImageCache::rel_datadir());
|
||||
let _ = std::fs::create_dir_all(imgcache_dir.clone());
|
||||
@@ -666,6 +841,7 @@ impl Damus {
|
||||
config.set_ingester_threads(2);
|
||||
Self {
|
||||
is_mobile,
|
||||
subscriptions: Subscriptions::default(),
|
||||
since_optimize: true,
|
||||
threads: Threads::default(),
|
||||
drafts: Drafts::default(),
|
||||
@@ -685,6 +861,20 @@ impl Damus {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find_timeline(&self, uid: u32) -> Option<usize> {
|
||||
for (i, timeline) in self.timelines.iter().enumerate() {
|
||||
if timeline.uid == uid {
|
||||
return Some(i);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub fn subscriptions(&mut self) -> &mut HashMap<String, SubKind> {
|
||||
&mut self.subscriptions.subs
|
||||
}
|
||||
|
||||
pub fn note_cache_mut(&mut self) -> &mut NoteCache {
|
||||
&mut self.note_cache
|
||||
}
|
||||
@@ -831,13 +1021,17 @@ fn thread_unsubscribe(app: &mut Damus, id: &[u8; 32]) {
|
||||
};
|
||||
|
||||
match unsubscribe {
|
||||
Ok(DecrementResult::LastSubscriber(sub_id)) => {
|
||||
if let Err(e) = app.ndb.unsubscribe(sub_id) {
|
||||
error!("failed to unsubscribe from thread: {e}, subid:{sub_id}, {} active subscriptions", app.ndb.subscription_count());
|
||||
Ok(DecrementResult::LastSubscriber(sub)) => {
|
||||
if let Err(e) = app.ndb.unsubscribe(sub) {
|
||||
error!(
|
||||
"failed to unsubscribe from thread: {e}, subid:{}, {} active subscriptions",
|
||||
sub.id(),
|
||||
app.ndb.subscription_count()
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"Unsubscribed from thread subid:{}. {} active subscriptions",
|
||||
sub_id,
|
||||
sub.id(),
|
||||
app.ndb.subscription_count()
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user