From c4c3656f906190fdf1eede1b776b16757eed2978 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20D=E2=80=99Aquino?= Date: Wed, 27 Aug 2025 18:59:20 -0700 Subject: [PATCH] Multi-session subscriptions and RelayPool reopening MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit implements nostr network subscriptions that survive between sessions, as well as improved handling of RelayPool opening/closing with respect to the app lifecycle. This prevents stale data after users swap out and back into Damus. Signed-off-by: Daniel D’Aquino --- damus/ContentView.swift | 1 + damus/Core/NIPs/NIP65/NIP65.swift | 4 ++ .../NostrNetworkManager.swift | 1 + .../SubscriptionManager.swift | 48 +++++++++++++++++++ damus/Core/Nostr/RelayPool.swift | 3 ++ .../NostrNetworkManagerTests.swift | 2 + .../ThreadModelTests.swift | 2 + nostrdb/Ndb.swift | 1 + 8 files changed, 62 insertions(+) diff --git a/damus/ContentView.swift b/damus/ContentView.swift index 6d72fbdf..e87251ff 100644 --- a/damus/ContentView.swift +++ b/damus/ContentView.swift @@ -521,6 +521,7 @@ struct ContentView: View { break case .active: print("txn: 📙 DAMUS ACTIVE") + damus_state.nostrNetwork.connect() damus_state.nostrNetwork.ping() @unknown default: break diff --git a/damus/Core/NIPs/NIP65/NIP65.swift b/damus/Core/NIPs/NIP65/NIP65.swift index 13c9bcd6..21af546c 100644 --- a/damus/Core/NIPs/NIP65/NIP65.swift +++ b/damus/Core/NIPs/NIP65/NIP65.swift @@ -42,6 +42,10 @@ extension NIP65 { self.relays = Self.relayOrderedDictionary(from: relays) } + init() { + self.relays = Self.relayOrderedDictionary(from: []) + } + init(relays: [RelayURL]) { let relayItemList = relays.map({ RelayItem(url: $0, rwConfiguration: .readWrite) }) self.relays = Self.relayOrderedDictionary(from: relayItemList) diff --git a/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift b/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift index 744513b5..daa60183 100644 --- a/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift @@ -50,6 +50,7 @@ class NostrNetworkManager { /// Connects the app to the Nostr network func connect() { self.userRelayList.connect() + self.pool.open = true } func disconnect() { diff --git a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift index 08226f4c..0ee285e3 100644 --- a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift @@ -34,6 +34,54 @@ extension NostrNetworkManager { /// - Parameter filters: The nostr filters to specify what kind of data to subscribe to /// - Returns: An async stream of nostr data func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream { + return AsyncStream { continuation in + let subscriptionId = UUID() + Log.info("Starting subscription %s: %s", for: .subscription_manager, subscriptionId.uuidString, filters.debugDescription) + let multiSessionStreamingTask = Task { + while !Task.isCancelled { + do { + guard !self.ndb.is_closed else { + Log.info("%s: Ndb closed. Sleeping for 1 second before resuming.", for: .subscription_manager, subscriptionId.uuidString) + try await Task.sleep(nanoseconds: 1_000_000_000) + continue + } + guard self.pool.open else { + Log.info("%s: RelayPool closed. Sleeping for 1 second before resuming.", for: .subscription_manager, subscriptionId.uuidString) + try await Task.sleep(nanoseconds: 1_000_000_000) + continue + } + Log.info("%s: Streaming.", for: .subscription_manager, subscriptionId.uuidString) + for await item in self.sessionSubscribe(filters: filters, to: desiredRelays) { + try Task.checkCancellation() + continuation.yield(item) + } + Log.info("%s: Session subscription ended. Sleeping for 1 second before resuming.", for: .subscription_manager, subscriptionId.uuidString) + try await Task.sleep(nanoseconds: 1_000_000_000) + } + catch { + Log.error("%s: Error: %s", for: .subscription_manager, subscriptionId.uuidString, error.localizedDescription) + } + } + Log.info("%s: Terminated.", for: .subscription_manager, subscriptionId.uuidString) + } + continuation.onTermination = { @Sendable _ in + Log.info("%s: Cancelled.", for: .subscription_manager, subscriptionId.uuidString) + multiSessionStreamingTask.cancel() + } + } + } + + /// Subscribes to data from the user's relays + /// + /// Only survives for a single session. This exits after the app is backgrounded + /// + /// ## Implementation notes + /// + /// - When we migrate to the local relay model, we should modify this function to stream directly from NostrDB + /// + /// - Parameter filters: The nostr filters to specify what kind of data to subscribe to + /// - Returns: An async stream of nostr data + private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream { return AsyncStream { continuation in let ndbStreamTask = Task { do { diff --git a/damus/Core/Nostr/RelayPool.swift b/damus/Core/Nostr/RelayPool.swift index 4026bebd..6f3f7443 100644 --- a/damus/Core/Nostr/RelayPool.swift +++ b/damus/Core/Nostr/RelayPool.swift @@ -27,6 +27,7 @@ struct SeenEvent: Hashable { /// Establishes and manages connections and subscriptions to a list of relays. class RelayPool { private(set) var relays: [Relay] = [] + var open: Bool = false var handlers: [RelayHandler] = [] var request_queue: [QueuedRequest] = [] var seen: [NoteId: Set] = [:] @@ -46,6 +47,7 @@ class RelayPool { func close() { disconnect() relays = [] + open = false handlers = [] request_queue = [] seen.removeAll() @@ -181,6 +183,7 @@ class RelayPool { } func connect(to: [RelayURL]? = nil) { + open = true let relays = to.map{ get_relays($0) } ?? self.relays for relay in relays { relay.connection.connect() diff --git a/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift b/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift index cb92ccf2..244b4061 100644 --- a/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift +++ b/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift @@ -15,6 +15,8 @@ class NostrNetworkManagerTests: XCTestCase { override func setUpWithError() throws { // Put setup code here. This method is called before the invocation of each test method in the class. damusState = generate_test_damus_state(mock_profile_info: nil) + try! damusState?.nostrNetwork.userRelayList.set(userRelayList: NIP65.RelayList()) + damusState?.nostrNetwork.connect() let notesJSONL = getTestNotesJSONL() diff --git a/damusTests/NostrNetworkManagerTests/ThreadModelTests.swift b/damusTests/NostrNetworkManagerTests/ThreadModelTests.swift index ee6b582c..d0824b14 100644 --- a/damusTests/NostrNetworkManagerTests/ThreadModelTests.swift +++ b/damusTests/NostrNetworkManagerTests/ThreadModelTests.swift @@ -15,6 +15,8 @@ final class ThreadModelTests: XCTestCase { override func setUpWithError() throws { // Put setup code here. This method is called before the invocation of each test method in the class. damusState = generate_test_damus_state(mock_profile_info: nil) + try! damusState?.nostrNetwork.userRelayList.set(userRelayList: NIP65.RelayList()) + damusState?.nostrNetwork.connect() let notesJSONL = getTestNotesJSONL() diff --git a/nostrdb/Ndb.swift b/nostrdb/Ndb.swift index e1754d56..f0de8f12 100644 --- a/nostrdb/Ndb.swift +++ b/nostrdb/Ndb.swift @@ -712,6 +712,7 @@ class Ndb { return AsyncStream { continuation in // Stream all results already present in the database for noteId in noteIds { + if Task.isCancelled { return } continuation.yield(.event(noteId)) }