diff --git a/damus/ContentView.swift b/damus/ContentView.swift index 3dddfc96..46c468c1 100644 --- a/damus/ContentView.swift +++ b/damus/ContentView.swift @@ -538,6 +538,7 @@ struct ContentView: View { Task { await damusClosingTask?.value // Wait for the closing task to finish before reopening things, to avoid race conditions damusClosingTask = nil + damus_state.ndb.reopen() // Pinging the network will automatically reconnect any dead websocket connections damus_state.nostrNetwork.ping() } diff --git a/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift b/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift index 5e17c200..c9b3a393 100644 --- a/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift @@ -61,6 +61,7 @@ class NostrNetworkManager { func handleAppBackgroundRequest() async { await self.reader.cancelAllTasks() + self.pool.cleanQueuedRequestForSessionEnd() } func close() async { diff --git a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift index 13420840..aaf0d0c7 100644 --- a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift @@ -220,6 +220,7 @@ extension NostrNetworkManager { catch { Self.logger.error("Session subscription \(id.uuidString, privacy: .public): NDB streaming error: \(error.localizedDescription, privacy: .public)") } + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): NDB streaming ended") continuation.finish() } let streamTask = Task { @@ -250,17 +251,19 @@ extension NostrNetworkManager { catch { Self.logger.error("Session subscription \(id.uuidString, privacy: .public): Network streaming error: \(error.localizedDescription, privacy: .public)") } + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Network streaming ended") continuation.finish() } Task { // Add the ndb streaming task to the task manager so that it can be cancelled when the app is backgrounded let ndbStreamTaskId = await self.taskManager.add(task: ndbStreamTask) + let streamTaskId = await self.taskManager.add(task: streamTask) continuation.onTermination = { @Sendable _ in Task { await self.taskManager.cancelAndCleanUp(taskId: ndbStreamTaskId) - streamTask.cancel() + await self.taskManager.cancelAndCleanUp(taskId: streamTaskId) } } } diff --git a/damus/Core/Nostr/RelayPool.swift b/damus/Core/Nostr/RelayPool.swift index 7a0d2eca..31a0f65f 100644 --- a/damus/Core/Nostr/RelayPool.swift +++ b/damus/Core/Nostr/RelayPool.swift @@ -10,6 +10,8 @@ import Network struct RelayHandler { let sub_id: String + let filters: [NostrFilter]? + let to: [RelayURL]? var callback: (RelayURL, NostrConnectionEvent) -> () } @@ -106,7 +108,7 @@ class RelayPool { } @MainActor - func register_handler(sub_id: String, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) async { + func register_handler(sub_id: String, filters: [NostrFilter]?, to relays: [RelayURL]? = nil, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) async { while handlers.count > Self.MAX_CONCURRENT_SUBSCRIPTION_LIMIT { Log.debug("%s: Too many subscriptions, waiting for subscription pool to clear", for: .networking, sub_id) try? await Task.sleep(for: .seconds(1)) @@ -121,7 +123,7 @@ class RelayPool { return true } }) - self.handlers.append(RelayHandler(sub_id: sub_id, callback: handler)) + self.handlers.append(RelayHandler(sub_id: sub_id, filters: filters, to: relays, callback: handler)) Log.debug("Registering %s handler, current: %d", for: .networking, sub_id, self.handlers.count) } @@ -211,6 +213,23 @@ class RelayPool { relay.connection.disconnect() } } + + /// Deletes queued up requests that should not persist between app sessions (i.e. when the app goes to background then back to foreground) + func cleanQueuedRequestForSessionEnd() { + request_queue = request_queue.filter { request in + guard case .typical(let typicalRequest) = request.req else { return true } + switch typicalRequest { + case .subscribe(_): + return true + case .unsubscribe(_): + return false // Do not persist unsubscribe requests to prevent them to race against subscribe requests when we come back to the foreground. + case .event(_): + return true + case .auth(_): + return true + } + } + } func unsubscribe(sub_id: String, to: [RelayURL]? = nil) { if to == nil { @@ -221,7 +240,7 @@ class RelayPool { func subscribe(sub_id: String, filters: [NostrFilter], handler: @escaping (RelayURL, NostrConnectionEvent) -> (), to: [RelayURL]? = nil) { Task { - await register_handler(sub_id: sub_id, handler: handler) + await register_handler(sub_id: sub_id, filters: filters, to: to, handler: handler) // When the caller specifies no relays, it is implied that the user wants to use the ones in the user relay list. Skip ephemeral relays in that case. // When the caller specifies specific relays, do not skip ephemeral relays to respect the exact list given by the caller. @@ -304,7 +323,7 @@ class RelayPool { func subscribe_to(sub_id: String, filters: [NostrFilter], to: [RelayURL]?, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) { Task { - await register_handler(sub_id: sub_id, handler: handler) + await register_handler(sub_id: sub_id, filters: filters, to: to, handler: handler) send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to) } @@ -411,14 +430,34 @@ class RelayPool { } } } + + func resubscribeAll(relayId: RelayURL) { + for handler in self.handlers { + guard let filters = handler.filters else { continue } + // When the caller specifies no relays, it is implied that the user wants to use the ones in the user relay list. Skip ephemeral relays in that case. + // When the caller specifies specific relays, do not skip ephemeral relays to respect the exact list given by the caller. + let shouldSkipEphemeralRelays = handler.to == nil ? true : false + + if let handlerTargetRelays = handler.to, + !handlerTargetRelays.contains(where: { $0 == relayId }) { + // Not part of the target relays, skip + continue + } + + send(.subscribe(.init(filters: filters, sub_id: handler.sub_id)), to: [relayId], skip_ephemeral: shouldSkipEphemeralRelays) + } + } func handle_event(relay_id: RelayURL, event: NostrConnectionEvent) { record_seen(relay_id: relay_id, event: event) - // run req queue when we reconnect + // When we reconnect, do two things + // - Send messages that were stored in the queue + // - Re-subscribe to filters we had subscribed before if case .ws_connection_event(let ws) = event { if case .connected = ws { run_queue(relay_id) + self.resubscribeAll(relayId: relay_id) } } diff --git a/damus/Features/Onboarding/Views/SaveKeysView.swift b/damus/Features/Onboarding/Views/SaveKeysView.swift index 4a2bf947..9939d778 100644 --- a/damus/Features/Onboarding/Views/SaveKeysView.swift +++ b/damus/Features/Onboarding/Views/SaveKeysView.swift @@ -142,7 +142,7 @@ struct SaveKeysView: View { add_rw_relay(self.pool, relay) } - Task { await self.pool.register_handler(sub_id: "signup", handler: handle_event) } + Task { await self.pool.register_handler(sub_id: "signup", filters: nil, handler: handle_event) } self.loading = true diff --git a/damus/Features/Posting/Models/PostBox.swift b/damus/Features/Posting/Models/PostBox.swift index 0f84038c..db5bb3b8 100644 --- a/damus/Features/Posting/Models/PostBox.swift +++ b/damus/Features/Posting/Models/PostBox.swift @@ -60,7 +60,7 @@ class PostBox { init(pool: RelayPool) { self.pool = pool self.events = [:] - Task { await pool.register_handler(sub_id: "postbox", handler: handle_event) } + Task { await pool.register_handler(sub_id: "postbox", filters: nil, to: nil, handler: handle_event) } } // only works reliably on delay-sent events diff --git a/nostrdb/Ndb.swift b/nostrdb/Ndb.swift index 0dabf95d..ea3e6ab6 100644 --- a/nostrdb/Ndb.swift +++ b/nostrdb/Ndb.swift @@ -701,9 +701,10 @@ class Ndb { terminationStarted = true Log.debug("ndb_wait: stream: Terminated early", for: .ndb) streaming = false - ndb_unsubscribe(self.ndb.ndb, subid) Task { await self.unsetCallback(subscriptionId: subid) } filtersPointer.deallocate() + guard !self.is_closed else { return } // Double-check Ndb is open before sending unsubscribe + ndb_unsubscribe(self.ndb.ndb, subid) } } }