diff --git a/crates/enostr/src/client/message.rs b/crates/enostr/src/client/message.rs index e912cdf2..f5ac36e0 100644 --- a/crates/enostr/src/client/message.rs +++ b/crates/enostr/src/client/message.rs @@ -3,7 +3,7 @@ use nostrdb::Filter; use serde_json::json; /// Messages sent by clients, received by relays -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ClientMessage { Event { note: Note, diff --git a/crates/enostr/src/lib.rs b/crates/enostr/src/lib.rs index e33e563a..e6d7d162 100644 --- a/crates/enostr/src/lib.rs +++ b/crates/enostr/src/lib.rs @@ -18,6 +18,7 @@ pub use profile::Profile; pub use pubkey::Pubkey; pub use relay::message::{RelayEvent, RelayMessage}; pub use relay::pool::{PoolEvent, RelayPool}; +pub use relay::subs_debug::{OwnedRelayEvent, RelayLogEvent, SubsDebug, TransferStats}; pub use relay::{Relay, RelayStatus}; pub type Result = std::result::Result; diff --git a/crates/enostr/src/relay/message.rs b/crates/enostr/src/relay/message.rs index a7dd12e5..bc0cb589 100644 --- a/crates/enostr/src/relay/message.rs +++ b/crates/enostr/src/relay/message.rs @@ -8,6 +8,12 @@ pub struct CommandResult<'a> { message: &'a str, } +pub fn calculate_command_result_size(result: &CommandResult) -> usize { + std::mem::size_of_val(result) + + result.event_id.as_bytes().len() + + result.message.as_bytes().len() +} + #[derive(Debug, Eq, PartialEq)] pub enum RelayMessage<'a> { OK(CommandResult<'a>), diff --git a/crates/enostr/src/relay/mod.rs b/crates/enostr/src/relay/mod.rs index 532252b4..68554f75 100644 --- a/crates/enostr/src/relay/mod.rs +++ b/crates/enostr/src/relay/mod.rs @@ -8,6 +8,7 @@ use tracing::{debug, error, info}; pub mod message; pub mod pool; +pub mod subs_debug; #[derive(Debug)] pub enum RelayStatus { diff --git a/crates/enostr/src/relay/subs_debug.rs b/crates/enostr/src/relay/subs_debug.rs new file mode 100644 index 00000000..32db510e --- /dev/null +++ b/crates/enostr/src/relay/subs_debug.rs @@ -0,0 +1,289 @@ +use std::{collections::HashMap, mem, time::SystemTime}; + +use ewebsock::WsMessage; +use nostrdb::Filter; + +use crate::{ClientMessage, Error, Note, RelayEvent, RelayMessage}; + +use super::message::calculate_command_result_size; + +type RelayId = String; +type SubId = String; + +pub struct SubsDebug { + data: HashMap, + time_incd: SystemTime, + pub relay_events_selection: Option, +} + +#[derive(Default)] +pub struct RelayStats { + pub count: TransferStats, + pub events: Vec, + pub sub_data: HashMap, +} + +#[derive(Clone)] +pub enum RelayLogEvent { + Send(ClientMessage), + Recieve(OwnedRelayEvent), +} + +#[derive(Clone)] +pub enum OwnedRelayEvent { + Opened, + Closed, + Other(String), + Error(String), + Message(String), +} + +impl From> for OwnedRelayEvent { + fn from(value: RelayEvent<'_>) -> Self { + match value { + RelayEvent::Opened => OwnedRelayEvent::Opened, + RelayEvent::Closed => OwnedRelayEvent::Closed, + RelayEvent::Other(ws_message) => { + let ws_str = match ws_message { + WsMessage::Binary(_) => "Binary".to_owned(), + WsMessage::Text(t) => format!("Text:{}", t), + WsMessage::Unknown(u) => format!("Unknown:{}", u), + WsMessage::Ping(_) => "Ping".to_owned(), + WsMessage::Pong(_) => "Pong".to_owned(), + }; + OwnedRelayEvent::Other(ws_str) + } + RelayEvent::Error(error) => OwnedRelayEvent::Error(error.to_string()), + RelayEvent::Message(relay_message) => { + let relay_msg = match relay_message { + RelayMessage::OK(_) => "OK".to_owned(), + RelayMessage::Eose(s) => format!("EOSE:{}", s), + RelayMessage::Event(_, s) => format!("EVENT:{}", s), + RelayMessage::Notice(s) => format!("NOTICE:{}", s), + }; + OwnedRelayEvent::Message(relay_msg) + } + } + } +} + +#[derive(PartialEq, Eq, Hash, Clone)] +pub struct RelaySub { + pub(crate) subid: String, + pub(crate) filter: String, +} + +#[derive(Default)] +pub struct SubStats { + pub filter: String, + pub count: TransferStats, +} + +#[derive(Default)] +pub struct TransferStats { + pub up_total: usize, + pub down_total: usize, + + // 1 sec < last tick < 2 sec + pub up_sec_prior: usize, + pub down_sec_prior: usize, + + // < 1 sec since last tick + up_sec_cur: usize, + down_sec_cur: usize, +} + +impl Default for SubsDebug { + fn default() -> Self { + Self { + data: Default::default(), + time_incd: SystemTime::now(), + relay_events_selection: None, + } + } +} + +impl SubsDebug { + pub fn get_data(&self) -> &HashMap { + &self.data + } + + pub(crate) fn send_cmd(&mut self, relay: String, cmd: &ClientMessage) { + let data = self.data.entry(relay).or_default(); + let msg_num_bytes = calculate_client_message_size(cmd); + match cmd { + ClientMessage::Req { sub_id, filters } => { + data.sub_data.insert( + sub_id.to_string(), + SubStats { + filter: filters_to_string(filters), + count: Default::default(), + }, + ); + } + + ClientMessage::Close { sub_id } => { + data.sub_data.remove(sub_id); + } + + _ => {} + } + + data.count.up_sec_cur += msg_num_bytes; + + data.events.push(RelayLogEvent::Send(cmd.clone())); + } + + pub(crate) fn receive_cmd(&mut self, relay: String, cmd: RelayEvent) { + let data = self.data.entry(relay).or_default(); + let msg_num_bytes = calculate_relay_event_size(&cmd); + if let RelayEvent::Message(RelayMessage::Event(sid, _)) = cmd { + if let Some(sub_data) = data.sub_data.get_mut(sid) { + let c = &mut sub_data.count; + c.down_sec_cur += msg_num_bytes; + } + }; + + data.count.down_sec_cur += msg_num_bytes; + + data.events.push(RelayLogEvent::Recieve(cmd.into())); + } + + pub fn try_increment_stats(&mut self) { + let cur_time = SystemTime::now(); + if let Ok(dur) = cur_time.duration_since(self.time_incd) { + if dur.as_secs() >= 1 { + self.time_incd = cur_time; + self.internal_inc_stats(); + } + } + } + + fn internal_inc_stats(&mut self) { + for relay_data in self.data.values_mut() { + let c = &mut relay_data.count; + inc_data_count(c); + + for sub in relay_data.sub_data.values_mut() { + inc_data_count(&mut sub.count); + } + } + } +} + +fn inc_data_count(c: &mut TransferStats) { + c.up_total += c.up_sec_cur; + c.up_sec_prior = c.up_sec_cur; + + c.down_total += c.down_sec_cur; + c.down_sec_prior = c.down_sec_cur; + + c.up_sec_cur = 0; + c.down_sec_cur = 0; +} + +fn calculate_note_size(note: &Note) -> usize { + let stack_size = mem::size_of_val(¬e.id) + + mem::size_of_val(¬e.pubkey) + + mem::size_of_val(¬e.created_at) + + mem::size_of_val(¬e.kind) + + mem::size_of_val(¬e.tags) + + mem::size_of_val(¬e.content) + + mem::size_of_val(¬e.sig); + + let heap_size = note + .tags + .iter() + .map(|tag| { + tag.iter().map(|s| s.len()).sum::() + tag.len() * mem::size_of::() + }) + .sum::() + + note.tags.len() * mem::size_of::>() + + note.content.len() + + note.sig.len(); + + stack_size + heap_size +} + +fn calculate_client_message_size(message: &ClientMessage) -> usize { + match message { + ClientMessage::Event { note } => mem::size_of_val(message) + calculate_note_size(note), + ClientMessage::Req { sub_id, filters } => { + mem::size_of_val(message) + + mem::size_of_val(sub_id) + + sub_id.as_bytes().len() + + filters.iter().map(mem::size_of_val).sum::() + } + ClientMessage::Close { sub_id } => { + mem::size_of_val(message) + mem::size_of_val(sub_id) + sub_id.as_bytes().len() + } + ClientMessage::Raw(data) => mem::size_of_val(message) + data.as_bytes().len(), + } +} + +fn calculate_relay_event_size(event: &RelayEvent<'_>) -> usize { + let base_size = mem::size_of_val(event); // Size of the enum on the stack + + let variant_size = match event { + RelayEvent::Opened | RelayEvent::Closed => 0, // No additional data + RelayEvent::Other(ws_message) => calculate_ws_message_size(ws_message), + RelayEvent::Error(error) => calculate_error_size(error), + RelayEvent::Message(message) => calculate_relay_message_size(message), + }; + + base_size + variant_size +} + +fn calculate_ws_message_size(message: &WsMessage) -> usize { + match message { + WsMessage::Binary(vec) | WsMessage::Ping(vec) | WsMessage::Pong(vec) => { + mem::size_of_val(message) + vec.len() + } + WsMessage::Text(string) | WsMessage::Unknown(string) => { + mem::size_of_val(message) + string.as_bytes().len() + } + } +} + +fn calculate_error_size(error: &Error) -> usize { + match error { + Error::Empty + | Error::DecodeFailed + | Error::HexDecodeFailed + | Error::InvalidBech32 + | Error::InvalidByteSize + | Error::InvalidSignature + | Error::InvalidPublicKey => mem::size_of_val(error), // No heap usage + + Error::Json(json_err) => mem::size_of_val(error) + json_err.to_string().as_bytes().len(), + + Error::Nostrdb(nostrdb_err) => { + mem::size_of_val(error) + nostrdb_err.to_string().as_bytes().len() + } + + Error::Generic(string) => mem::size_of_val(error) + string.as_bytes().len(), + } +} + +fn calculate_relay_message_size(message: &RelayMessage) -> usize { + match message { + RelayMessage::OK(result) => calculate_command_result_size(result), + RelayMessage::Eose(str_ref) + | RelayMessage::Event(str_ref, _) + | RelayMessage::Notice(str_ref) => mem::size_of_val(message) + str_ref.as_bytes().len(), + } +} + +fn filters_to_string(f: &Vec) -> String { + let mut cur_str = String::new(); + for filter in f { + if let Ok(json) = filter.json() { + if !cur_str.is_empty() { + cur_str.push_str(", "); + } + cur_str.push_str(&json); + } + } + + cur_str +}