diff --git a/damus/Models/EventsModel.swift b/damus/Models/EventsModel.swift index 2ef83e42..cb110226 100644 --- a/damus/Models/EventsModel.swift +++ b/damus/Models/EventsModel.swift @@ -31,9 +31,9 @@ class EventsModel: ObservableObject { } func subscribe() { - state.pool.subscribe(sub_id: sub_id, - filters: [get_filter()], - handler: handle_nostr_event) + state.pool.subscribe_to(sub_id: sub_id, + filters: [get_filter()], + handler: handle_nostr_event) } func unsubscribe() { diff --git a/damus/Models/FollowersModel.swift b/damus/Models/FollowersModel.swift index ae05a19e..10e19d3f 100644 --- a/damus/Models/FollowersModel.swift +++ b/damus/Models/FollowersModel.swift @@ -40,7 +40,7 @@ class FollowersModel: ObservableObject { let filter = get_filter() let filters = [filter] print_filters(relay_id: "following", filters: [filters]) - self.damus_state.pool.subscribe(sub_id: sub_id, filters: filters, handler: handle_event) + self.damus_state.pool.subscribe_to(sub_id: sub_id, filters: filters, handler: handle_event) } func unsubscribe() { diff --git a/damus/Models/FollowingModel.swift b/damus/Models/FollowingModel.swift index 30bab9ae..c22237aa 100644 --- a/damus/Models/FollowingModel.swift +++ b/damus/Models/FollowingModel.swift @@ -41,7 +41,7 @@ class FollowingModel { } let filters = [filter] print_filters(relay_id: "following", filters: [filters]) - self.damus_state.pool.subscribe(sub_id: sub_id, filters: filters, handler: handle_event) + self.damus_state.pool.subscribe_to(sub_id: sub_id, filters: filters, handler: handle_event) } func unsubscribe() { diff --git a/damus/Models/ProfileModel.swift b/damus/Models/ProfileModel.swift index 5e7f2b82..024197d2 100644 --- a/damus/Models/ProfileModel.swift +++ b/damus/Models/ProfileModel.swift @@ -83,8 +83,8 @@ class ProfileModel: ObservableObject, Equatable { print("subscribing to profile \(pubkey) with sub_id \(sub_id)") print_filters(relay_id: "profile", filters: [[text_filter], [profile_filter]]) - damus.pool.subscribe(sub_id: sub_id, filters: [text_filter], handler: handle_event) - damus.pool.subscribe(sub_id: prof_subid, filters: [profile_filter], handler: handle_event) + damus.pool.subscribe_to(sub_id: sub_id, filters: [text_filter], handler: handle_event) + damus.pool.subscribe_to(sub_id: prof_subid, filters: [profile_filter], handler: handle_event) } func handle_profile_contact_event(_ ev: NostrEvent) { diff --git a/damus/Models/SearchHomeModel.swift b/damus/Models/SearchHomeModel.swift index 9f5aec3a..0110de39 100644 --- a/damus/Models/SearchHomeModel.swift +++ b/damus/Models/SearchHomeModel.swift @@ -38,7 +38,7 @@ class SearchHomeModel: ObservableObject { func subscribe() { loading = true let to_relays = determine_to_relays(pool: damus_state.pool, filters: damus_state.relay_filters) - damus_state.pool.subscribe(sub_id: base_subid, filters: [get_base_filter()], handler: handle_event, to: to_relays) + damus_state.pool.subscribe_to(sub_id: base_subid, filters: [get_base_filter()], to: to_relays, handler: handle_event) } func unsubscribe(to: String? = nil) { diff --git a/damus/Models/ZapsModel.swift b/damus/Models/ZapsModel.swift index eaf0f334..91f099da 100644 --- a/damus/Models/ZapsModel.swift +++ b/damus/Models/ZapsModel.swift @@ -29,7 +29,7 @@ class ZapsModel: ObservableObject { case .note(let note_target): filter.referenced_ids = [note_target.note_id] } - state.pool.subscribe(sub_id: zaps_subid, filters: [filter], handler: handle_event) + state.pool.subscribe_to(sub_id: zaps_subid, filters: [filter], handler: handle_event) } func unsubscribe() { diff --git a/damus/Nostr/RelayConnection.swift b/damus/Nostr/RelayConnection.swift index 8b787fcd..5efd778d 100644 --- a/damus/Nostr/RelayConnection.swift +++ b/damus/Nostr/RelayConnection.swift @@ -14,9 +14,15 @@ enum NostrConnectionEvent { } final class RelayConnection: WebSocketDelegate { - private(set) var isConnected = false - private(set) var isConnecting = false - private(set) var isReconnecting = false + enum State { + case notConnected + case connecting + case reconnecting + case connected + case failed + } + + private(set) var state: State = .notConnected private(set) var last_connection_attempt: TimeInterval = 0 private lazy var socket = { @@ -25,38 +31,36 @@ final class RelayConnection: WebSocketDelegate { socket.delegate = self return socket }() - private var handleEvent: (NostrConnectionEvent) -> () + private let eventHandler: (NostrConnectionEvent) -> () let url: URL - - init(url: URL, handleEvent: @escaping (NostrConnectionEvent) -> ()) { + + init(url: URL, eventHandler: @escaping (NostrConnectionEvent) -> ()) { self.url = url - self.handleEvent = handleEvent + self.eventHandler = eventHandler } func reconnect() { - if isConnected { - isReconnecting = true + if state == .connected { + state = .reconnecting disconnect() } else { // we're already disconnected, so just connect - connect(force: true) + connect() } } func connect(force: Bool = false) { - if !force && (isConnected || isConnecting) { + if !force && (state == .connected || state == .connecting) { return } - isConnecting = true + state = .connecting last_connection_attempt = Date().timeIntervalSince1970 socket.connect() } func disconnect() { socket.disconnect() - isConnected = false - isConnecting = false } func send(_ req: NostrRequest) { @@ -68,51 +72,52 @@ final class RelayConnection: WebSocketDelegate { socket.write(string: req) } + private func decodeEvent(_ txt: String) throws -> NostrConnectionEvent { + if let ev = decode_nostr_event(txt: txt) { + return .nostr_event(ev) + } else { + throw DecodingError.dataCorrupted(.init(codingPath: [], debugDescription: "decoding event failed")) + } + } + + @MainActor + private func handleEvent(_ event: NostrConnectionEvent) async { + eventHandler(event) + } + // MARK: - WebSocketDelegate func didReceive(event: WebSocketEvent, client: WebSocket) { switch event { case .connected: - self.isConnected = true - self.isConnecting = false + state = .connected case .disconnected: - self.isConnecting = false - self.isConnected = false - if self.isReconnecting { - self.isReconnecting = false - self.connect() + if state == .reconnecting { + connect() + } else { + state = .notConnected } case .cancelled, .error: - self.isConnecting = false - self.isConnected = false + state = .failed case .text(let txt): - if txt.count > 2000 { - DispatchQueue.global(qos: .default).async { - if let ev = decode_nostr_event(txt: txt) { - DispatchQueue.main.async { - self.handleEvent(.nostr_event(ev)) - } - return - } - } - } else { - if let ev = decode_nostr_event(txt: txt) { - handleEvent(.nostr_event(ev)) - return + Task(priority: .userInitiated) { + do { + let event = try decodeEvent(txt) + await handleEvent(event) + } catch { + print("decode failed for \(txt): \(error)") + // TODO: trigger event error } } - print("decode failed for \(txt)") - // TODO: trigger event error - default: break } - handleEvent(.ws_event(event)) + eventHandler(.ws_event(event)) } } diff --git a/damus/Nostr/RelayPool.swift b/damus/Nostr/RelayPool.swift index e7d95e24..0a693a83 100644 --- a/damus/Nostr/RelayPool.swift +++ b/damus/Nostr/RelayPool.swift @@ -7,22 +7,6 @@ import Foundation -struct SubscriptionId: Identifiable, CustomStringConvertible { - let id: String - - var description: String { - id - } -} - -struct RelayId: Identifiable, CustomStringConvertible { - let id: String - - var description: String { - id - } -} - struct RelayHandler { let sub_id: String let callback: (String, NostrConnectionEvent) -> () @@ -33,22 +17,21 @@ struct QueuedRequest { let relay: String } -struct NostrRequestId: Equatable, Hashable { - let relay: String? - let sub_id: String -} - -class RelayPool { - /// Used for an exponential backoff algorithm when retrying stale connections - /// Each retry attempt will be delayed by raising this base delay to an exponent - /// equal to the number of previous retries. - private static let base_reconnect_delay: TimeInterval = 2 +final class RelayPool { + private enum Constants { + /// Used for an exponential backoff algorithm when retrying stale connections + /// Each retry attempt will be delayed by raising this base delay to an exponent + /// equal to the number of previous retries. + static let base_reconnect_delay: TimeInterval = 2 + static let max_queued_requests = 10 + static let max_retry_attempts = 3 + } - var relays: [Relay] = [] - var handlers: [RelayHandler] = [] - var request_queue: [QueuedRequest] = [] - var seen: Set = Set() - var counts: [String: UInt64] = [:] + private(set) var relays: [Relay] = [] + private var handlers: [RelayHandler] = [] + private var request_queue: [QueuedRequest] = [] + private var seen: Set = Set() + private var counts: [String: UInt64] = [:] private var retry_attempts_per_relay: [URL: Int] = [:] var descriptors: [RelayDescriptor] { @@ -56,37 +39,28 @@ class RelayPool { } var num_connecting: Int { - return relays.reduce(0) { n, r in n + (r.connection.isConnecting ? 1 : 0) } + relays.reduce(0) { n, r in n + (r.connection.state == .connecting ? 1 : 0) } } - func remove_handler(sub_id: String) { + private func remove_handler(sub_id: String) { self.handlers = handlers.filter { $0.sub_id != sub_id } print("removing \(sub_id) handler, current: \(handlers.count)") } func register_handler(sub_id: String, handler: @escaping (String, NostrConnectionEvent) -> ()) { - for handler in handlers { - // don't add duplicate handlers - if handler.sub_id == sub_id { - return - } + guard !handlers.contains(where: { $0.sub_id == sub_id }) else { + return // don't add duplicate handlers } - self.handlers.append(RelayHandler(sub_id: sub_id, callback: handler)) + + handlers.append(RelayHandler(sub_id: sub_id, callback: handler)) print("registering \(sub_id) handler, current: \(self.handlers.count)") } func remove_relay(_ relay_id: String) { - var i: Int = 0 + disconnect(from: [relay_id]) - self.disconnect(to: [relay_id]) - - for relay in relays { - if relay.id == relay_id { - relays.remove(at: i) - break - } - - i += 1 + if let index = relays.firstIndex(where: { $0.id == relay_id }) { + relays.remove(at: index) } } @@ -105,38 +79,51 @@ class RelayPool { /// This is used to retry dead connections func connect_to_disconnected() { - for relay in relays where !relay.is_broken && !relay.connection.isConnected { + for relay in relays where !relay.is_broken && relay.connection.state != .connected { let c = relay.connection - let is_connecting = c.isReconnecting || c.isConnecting + let is_connecting = c.state == .reconnecting || c.state == .connecting let retry_attempts = retry_attempts_per_relay[c.url] ?? 0 - let delay = pow(RelayPool.base_reconnect_delay, TimeInterval(retry_attempts_per_relay[c.url] ?? 0)) + + let delay = pow(Constants.base_reconnect_delay, TimeInterval(retry_attempts + 1)) // the + 1 helps us avoid a 1-second delay for the first retry if is_connecting && (Date.now.timeIntervalSince1970 - c.last_connection_attempt) > delay { - print("stale connection detected (\(relay.descriptor.url.absoluteString)). retrying after \(delay) seconds...") - relay.connection.connect(force: true) - retry_attempts_per_relay[c.url] = retry_attempts + 1 + if retry_attempts > Constants.max_retry_attempts { + if c.state != .notConnected { + c.disconnect() + print("exceeded max connection attempts with \(relay.descriptor.url.absoluteString)") + relay.mark_broken() + } + continue + } else { + print("stale connection detected (\(relay.descriptor.url.absoluteString)). retrying after \(delay) seconds...") + c.connect(force: true) + retry_attempts_per_relay[c.url] = retry_attempts + 1 + } } else if is_connecting { continue } else { - relay.connection.reconnect() + c.reconnect() } - } } - func reconnect(to: [String]? = nil) { - let relays = to.map{ get_relays($0) } ?? self.relays - for relay in relays { + func reconnect(to relay_ids: [String]? = nil) { + let relays: [Relay] + if let relay_ids { + relays = get_relays(relay_ids) + } else { + relays = self.relays + } + + for relay in relays where !relay.is_broken { // don't try to reconnect to broken relays relay.connection.reconnect() } } func mark_broken(_ relay_id: String) { - for relay in relays { - relay.mark_broken() - } + relays.first(where: { $0.id == relay_id })?.mark_broken() } func connect(to: [String]? = nil) { @@ -146,8 +133,8 @@ class RelayPool { } } - func disconnect(to: [String]? = nil) { - let relays = to.map{ get_relays($0) } ?? self.relays + private func disconnect(from: [String]? = nil) { + let relays = from.map{ get_relays($0) } ?? self.relays for relay in relays { relay.connection.disconnect() } @@ -155,35 +142,23 @@ class RelayPool { func unsubscribe(sub_id: String, to: [String]? = nil) { if to == nil { - self.remove_handler(sub_id: sub_id) + remove_handler(sub_id: sub_id) } - self.send(.unsubscribe(sub_id), to: to) + send(.unsubscribe(sub_id), to: to) } - func subscribe(sub_id: String, filters: [NostrFilter], handler: @escaping (String, NostrConnectionEvent) -> (), to: [String]? = nil) { + func subscribe_to(sub_id: String, filters: [NostrFilter], to: [String]? = nil, handler: @escaping (String, NostrConnectionEvent) -> ()) { register_handler(sub_id: sub_id, handler: handler) send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to) } - func subscribe_to(sub_id: String, filters: [NostrFilter], to: [String]?, handler: @escaping (String, NostrConnectionEvent) -> ()) { - register_handler(sub_id: sub_id, handler: handler) - send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to) - } - - func count_queued(relay: String) -> Int { - var c = 0 - for request in request_queue { - if request.relay == relay { - c += 1 - } - } - - return c + private func count_queued(relay: String) -> Int { + request_queue.filter({ $0.relay == relay }).count } func queue_req(r: NostrRequest, relay: String) { let count = count_queued(relay: relay) - guard count <= 10 else { + guard count <= Constants.max_queued_requests else { print("can't queue, too many queued events for \(relay)") return } @@ -193,10 +168,10 @@ class RelayPool { } func send(_ req: NostrRequest, to: [String]? = nil) { - let relays = to.map{ get_relays($0) } ?? self.relays - + let relays = to.map { get_relays($0) } ?? self.relays + for relay in relays { - guard relay.connection.isConnected else { + guard relay.connection.state == .connected else { queue_req(r: req, relay: relay.id) continue } @@ -216,17 +191,14 @@ class RelayPool { func record_last_pong(relay_id: String, event: NostrConnectionEvent) { if case .ws_event(let ws_event) = event { if case .pong = ws_event { - for relay in relays { - if relay.id == relay_id { - relay.last_pong = UInt32(Date.now.timeIntervalSince1970) - return - } + if let relay = relays.first(where: { $0.id == relay_id }) { + relay.last_pong = UInt32(Date.now.timeIntervalSince1970) } } } } - func run_queue(_ relay_id: String) { + private func run_queue(_ relay_id: String) { self.request_queue = request_queue.reduce(into: Array()) { (q, req) in guard req.relay == relay_id else { q.append(req) @@ -238,23 +210,20 @@ class RelayPool { } } - func record_seen(relay_id: String, event: NostrConnectionEvent) { + private func record_seen(relay_id: String, event: NostrConnectionEvent) { if case .nostr_event(let ev) = event { if case .event(_, let nev) = ev { let k = relay_id + nev.id if !seen.contains(k) { seen.insert(k) - if counts[relay_id] == nil { - counts[relay_id] = 1 - } else { - counts[relay_id] = (counts[relay_id] ?? 0) + 1 - } + let prev_count = counts[relay_id] ?? 0 + counts[relay_id] = prev_count + 1 } } } } - func handle_event(relay_id: String, event: NostrConnectionEvent) { + private func handle_event(relay_id: String, event: NostrConnectionEvent) { record_last_pong(relay_id: relay_id, event: event) record_seen(relay_id: relay_id, event: event) @@ -276,5 +245,3 @@ func add_rw_relay(_ pool: RelayPool, _ url: String) { let url_ = URL(string: url)! try? pool.add_relay(url_, info: RelayInfo.rw) } - - diff --git a/damus/Views/Relays/RelayStatus.swift b/damus/Views/Relays/RelayStatus.swift index 47cada28..ec6aa774 100644 --- a/damus/Views/Relays/RelayStatus.swift +++ b/damus/Views/Relays/RelayStatus.swift @@ -7,6 +7,16 @@ import SwiftUI +extension RelayConnection.State { + var indicatorColor: Color { + switch self { + case .connected: return .green + case .connecting, .reconnecting: return .yellow + default: return .red + } + } +} + struct RelayStatus: View { let pool: RelayPool let relay: String @@ -16,18 +26,10 @@ struct RelayStatus: View { @State var conn_color: Color = .gray func update_connection_color() { - for relay in pool.relays { - if relay.id == self.relay { - let c = relay.connection - if c.isConnected { - conn_color = .green - } else if c.isConnecting || c.isReconnecting { - conn_color = .yellow - } else { - conn_color = .red - } - } + guard let relay = pool.relays.first(where: { $0.id == relay }) else { + return } + conn_color = relay.connection.state.indicatorColor } var body: some View {