pool: implement keepalive pinging

To prevent us from disconnecting, introduce keepalive pinging. In the
event loop we check if any relays need a refresh ping.
This commit is contained in:
William Casarin
2023-12-24 12:25:35 -08:00
parent dd7093288b
commit 09cd8ff379
2 changed files with 52 additions and 10 deletions

View File

@@ -2,6 +2,8 @@ use crate::relay::message::RelayEvent;
use crate::relay::Relay; use crate::relay::Relay;
use crate::{ClientMessage, Result}; use crate::{ClientMessage, Result};
use std::time::{Duration, Instant};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use ewebsock::WsMessage; use ewebsock::WsMessage;
@@ -16,25 +18,42 @@ pub struct PoolEvent<'a> {
pub event: RelayEvent, pub event: RelayEvent,
} }
pub struct RelayPool { pub struct PoolRelay {
pub relays: Vec<Relay>, pub relay: Relay,
pub last_ping: Instant,
} }
impl Default for RelayPool { impl PoolRelay {
fn default() -> RelayPool { pub fn new(relay: Relay) -> PoolRelay {
RelayPool { relays: Vec::new() } PoolRelay {
relay: relay,
last_ping: Instant::now(),
}
} }
} }
pub struct RelayPool {
pub relays: Vec<PoolRelay>,
pub ping_rate: Duration,
}
impl RelayPool { impl RelayPool {
// Constructs a new, empty RelayPool. // Constructs a new, empty RelayPool.
pub fn new() -> RelayPool { pub fn new() -> RelayPool {
RelayPool { relays: vec![] } RelayPool {
relays: vec![],
ping_rate: Duration::from_secs(25),
}
}
pub fn ping_rate(&mut self, duration: Duration) -> &mut Self {
self.ping_rate = duration;
self
} }
pub fn has(&self, url: &str) -> bool { pub fn has(&self, url: &str) -> bool {
for relay in &self.relays { for relay in &self.relays {
if &relay.url == url { if &relay.relay.url == url {
return true; return true;
} }
} }
@@ -43,12 +62,27 @@ impl RelayPool {
pub fn send(&mut self, cmd: &ClientMessage) { pub fn send(&mut self, cmd: &ClientMessage) {
for relay in &mut self.relays { for relay in &mut self.relays {
relay.send(cmd); relay.relay.send(cmd);
}
}
/// Keep relay connectiongs alive by pinging relays that haven't been
/// pinged in awhile. Adjust ping rate with [`ping_rate`].
pub fn keepalive_ping(&mut self) {
for relay in &mut self.relays {
let now = std::time::Instant::now();
let should_ping = now - relay.last_ping > self.ping_rate;
if should_ping {
debug!("pinging {}", relay.relay.url);
relay.relay.ping();
relay.last_ping = Instant::now();
}
} }
} }
pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) { pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) {
for relay in &mut self.relays { for relay in &mut self.relays {
let relay = &mut relay.relay;
if relay.url == relay_url { if relay.url == relay_url {
relay.send(cmd); relay.send(cmd);
return; return;
@@ -63,8 +97,9 @@ impl RelayPool {
wakeup: impl Fn() + Send + Sync + 'static, wakeup: impl Fn() + Send + Sync + 'static,
) -> Result<()> { ) -> Result<()> {
let relay = Relay::new(url, wakeup)?; let relay = Relay::new(url, wakeup)?;
let pool_relay = PoolRelay::new(relay);
self.relays.push(relay); self.relays.push(pool_relay);
Ok(()) Ok(())
} }
@@ -72,6 +107,7 @@ impl RelayPool {
/// Attempts to receive a pool event from a list of relays. The function searches each relay in the list in order, attempting to receive a message from each. If a message is received, return it. If no message is received from any relays, None is returned. /// Attempts to receive a pool event from a list of relays. The function searches each relay in the list in order, attempting to receive a message from each. If a message is received, return it. If no message is received from any relays, None is returned.
pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> { pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> {
for relay in &mut self.relays { for relay in &mut self.relays {
let relay = &mut relay.relay;
if let Some(msg) = relay.receiver.try_recv() { if let Some(msg) = relay.receiver.try_recv() {
match msg.try_into() { match msg.try_into() {
Ok(event) => { Ok(event) => {

View File

@@ -13,6 +13,7 @@ use enostr::{ClientMessage, EventId, Filter, Profile, Pubkey, RelayEvent, RelayM
use poll_promise::Promise; use poll_promise::Promise;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::time::Duration;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use enostr::{Event, RelayPool}; use enostr::{Event, RelayPool};
@@ -62,7 +63,7 @@ impl Default for Damus {
state: DamusState::Initializing, state: DamusState::Initializing,
contacts: Contacts::new(), contacts: Contacts::new(),
all_events: HashMap::new(), all_events: HashMap::new(),
pool: RelayPool::default(), pool: RelayPool::new(),
events: vec![], events: vec![],
img_cache: HashMap::new(), img_cache: HashMap::new(),
n_panels: 1, n_panels: 1,
@@ -99,6 +100,7 @@ fn send_initial_filters(pool: &mut RelayPool, relay_url: &str) {
let subid = "initial"; let subid = "initial";
for relay in &mut pool.relays { for relay in &mut pool.relays {
let relay = &mut relay.relay;
if relay.url == relay_url { if relay.url == relay_url {
relay.subscribe(subid.to_string(), vec![filter]); relay.subscribe(subid.to_string(), vec![filter]);
return; return;
@@ -114,6 +116,8 @@ fn try_process_event(damus: &mut Damus, ctx: &egui::Context) {
ctx.set_pixels_per_point(ctx.pixels_per_point() - amount); ctx.set_pixels_per_point(ctx.pixels_per_point() - amount);
} }
damus.pool.keepalive_ping();
// pool stuff // pool stuff
if let Some(ev) = damus.pool.try_recv() { if let Some(ev) = damus.pool.try_recv() {
let relay = ev.relay.to_owned(); let relay = ev.relay.to_owned();
@@ -251,6 +255,8 @@ fn render_damus(damus: &mut Damus, ctx: &Context) {
render_damus_desktop(ctx, damus); render_damus_desktop(ctx, damus);
} }
ctx.request_repaint_after(Duration::from_secs(1));
#[cfg(feature = "profiling")] #[cfg(feature = "profiling")]
puffin_egui::profiler_window(ctx); puffin_egui::profiler_window(ctx);
} }