From d9306d415323abaa0107dfd98cf2e521b03a67dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20D=E2=80=99Aquino?= Date: Sun, 5 Oct 2025 15:21:57 -0700 Subject: [PATCH] Modify NostrNetworkManager pipeline architecture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, we combined the ndb and network stream within a "session subscription" stream, which was teared down and rebuilt every time the app went into the background and back to the foreground (This was done to prevent crashes related to access to Ndb memory when Ndb is closed). However, this caused complications and instability on the network stream, leading to timeline staleness. To address this, the pipeline was modified to merge the ndb and network streams further upstream, on the multi-session stage, allowing the session subscription streams to be completely split between Ndb and the network. For the ndb stream, we still tear it down and bring it up along the app foreground state, to prevent memory crashes. However, the network stream is kept intact between sessions, since RelayPool will now automatically handle resubscription on websocket reconnection. This prevents complexity and potential race conditions that could lead to timeline staleness. Signed-off-by: Daniel D’Aquino --- .../SubscriptionManager.swift | 212 +++++++++++------- damus/Core/Nostr/RelayPool.swift | 1 + 2 files changed, 129 insertions(+), 84 deletions(-) diff --git a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift index aaf0d0c7..1fdc7ba5 100644 --- a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift @@ -112,64 +112,7 @@ extension NostrNetworkManager { } } - /// Subscribes to data from the user's relays func advancedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { - return AsyncStream { continuation in - let subscriptionId = id ?? UUID() - let startTime = CFAbsoluteTimeGetCurrent() - Self.logger.info("Starting subscription \(subscriptionId.uuidString, privacy: .public): \(filters.debugDescription, privacy: .private)") - let multiSessionStreamingTask = Task { - while !Task.isCancelled { - do { - guard !self.ndb.is_closed else { - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Ndb closed. Sleeping for 1 second before resuming.") - try await Task.sleep(nanoseconds: 1_000_000_000) - continue - } - guard self.pool.open else { - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): RelayPool closed. Sleeping for 1 second before resuming.") - try await Task.sleep(nanoseconds: 1_000_000_000) - continue - } - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Streaming.") - for await item in self.sessionSubscribe(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { - try Task.checkCancellation() - continuation.yield(item) - } - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Session subscription ended. Sleeping for 1 second before resuming.") - try await Task.sleep(nanoseconds: 1_000_000_000) - } - catch { - Self.logger.error("Session subscription \(subscriptionId.uuidString, privacy: .public): Error: \(error.localizedDescription, privacy: .public)") - } - } - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Terminated.") - } - let timeoutTask = Task { - if let timeout { - try await Task.sleep(for: timeout) - continuation.finish() // End the stream due to timeout. - } - } - continuation.onTermination = { @Sendable _ in - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Cancelled.") - multiSessionStreamingTask.cancel() - timeoutTask.cancel() - } - } - } - - /// Subscribes to data from the user's relays - /// - /// Only survives for a single session. This exits after the app is backgrounded - /// - /// ## Implementation notes - /// - /// - When we migrate to the local relay model, we should modify this function to stream directly from NostrDB - /// - /// - Parameter filters: The nostr filters to specify what kind of data to subscribe to - /// - Returns: An async stream of nostr data - private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { let id = id ?? UUID() let streamMode = streamMode ?? defaultStreamMode() return AsyncStream { continuation in @@ -194,36 +137,66 @@ extension NostrNetworkManager { } } - let ndbStreamTask = Task { - do { - for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) { + let streamTask = Task { + while !Task.isCancelled { + for await item in self.multiSessionNetworkStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { try Task.checkCancellation() switch item { + case .event(let lender): + continuation.yield(item) case .eose: - Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from nostrdb. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds") - continuation.yield(.ndbEose) - ndbEOSEIssued = true + break // Should not happen + case .ndbEose: + break // Should not happen + case .networkEose: + continuation.yield(item) + networkEOSEIssued = true yieldEOSEIfReady() - case .event(let noteKey): - let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey) - try Task.checkCancellation() - guard let desiredRelays else { - continuation.yield(.event(lender: lender)) // If no desired relays are specified, return all notes we see. - break - } - if try ndb.was(noteKey: noteKey, seenOnAnyOf: desiredRelays) { - continuation.yield(.event(lender: lender)) // If desired relays were specified and this note was seen there, return it. - } } } } - catch { - Self.logger.error("Session subscription \(id.uuidString, privacy: .public): NDB streaming error: \(error.localizedDescription, privacy: .public)") - } - Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): NDB streaming ended") - continuation.finish() } + + let ndbStreamTask = Task { + while !Task.isCancelled { + for await item in self.multiSessionNdbStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { + try Task.checkCancellation() + switch item { + case .event(let lender): + continuation.yield(item) + case .eose: + break // Should not happen + case .ndbEose: + continuation.yield(item) + ndbEOSEIssued = true + yieldEOSEIfReady() + case .networkEose: + break // Should not happen + } + } + } + } + + continuation.onTermination = { @Sendable _ in + streamTask.cancel() + ndbStreamTask.cancel() + } + } + } + + private func multiSessionNetworkStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { + let id = id ?? UUID() + let streamMode = streamMode ?? defaultStreamMode() + return AsyncStream { continuation in + let startTime = CFAbsoluteTimeGetCurrent() + Self.logger.debug("Network subscription \(id.uuidString, privacy: .public): Started") + let streamTask = Task { + while !self.pool.open { + Self.logger.info("\(id.uuidString, privacy: .public): RelayPool closed. Sleeping for 1 second before resuming.") + try await Task.sleep(nanoseconds: 1_000_000_000) + continue + } do { for await item in self.pool.subscribe(filters: filters, to: desiredRelays, id: id) { // NO-OP. Notes will be automatically ingested by NostrDB @@ -243,27 +216,98 @@ extension NostrNetworkManager { case .eose: Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from the network. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds") continuation.yield(.networkEose) - networkEOSEIssued = true - yieldEOSEIfReady() } } } catch { - Self.logger.error("Session subscription \(id.uuidString, privacy: .public): Network streaming error: \(error.localizedDescription, privacy: .public)") + Self.logger.error("Network subscription \(id.uuidString, privacy: .public): Streaming error: \(error.localizedDescription, privacy: .public)") } - Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Network streaming ended") + Self.logger.debug("Network subscription \(id.uuidString, privacy: .public): Network streaming ended") + continuation.finish() + } + + continuation.onTermination = { @Sendable _ in + streamTask.cancel() + } + } + } + + private func multiSessionNdbStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { + return AsyncStream { continuation in + let subscriptionId = id ?? UUID() + let startTime = CFAbsoluteTimeGetCurrent() + Self.logger.info("Starting multi-session NDB subscription \(subscriptionId.uuidString, privacy: .public): \(filters.debugDescription, privacy: .private)") + let multiSessionStreamingTask = Task { + while !Task.isCancelled { + do { + guard !self.ndb.is_closed else { + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Ndb closed. Sleeping for 1 second before resuming.") + try await Task.sleep(nanoseconds: 1_000_000_000) + continue + } + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Streaming from NDB.") + for await item in self.sessionNdbStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { + try Task.checkCancellation() + continuation.yield(item) + } + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Session subscription ended. Sleeping for 1 second before resuming.") + try await Task.sleep(nanoseconds: 1_000_000_000) + } + catch { + Self.logger.error("Session subscription \(subscriptionId.uuidString, privacy: .public): Error: \(error.localizedDescription, privacy: .public)") + } + } + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Terminated.") + } + continuation.onTermination = { @Sendable _ in + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Cancelled multi-session NDB stream.") + multiSessionStreamingTask.cancel() + } + } + } + + private func sessionNdbStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { + let id = id ?? UUID() + //let streamMode = streamMode ?? defaultStreamMode() + return AsyncStream { continuation in + let startTime = CFAbsoluteTimeGetCurrent() + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Started") + + let ndbStreamTask = Task { + do { + for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) { + try Task.checkCancellation() + switch item { + case .eose: + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from nostrdb. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds") + continuation.yield(.ndbEose) + case .event(let noteKey): + let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey) + try Task.checkCancellation() + guard let desiredRelays else { + continuation.yield(.event(lender: lender)) // If no desired relays are specified, return all notes we see. + break + } + if try ndb.was(noteKey: noteKey, seenOnAnyOf: desiredRelays) { + continuation.yield(.event(lender: lender)) // If desired relays were specified and this note was seen there, return it. + } + } + } + } + catch { + Self.logger.error("Session subscription \(id.uuidString, privacy: .public): NDB streaming error: \(error.localizedDescription, privacy: .public)") + } + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): NDB streaming ended") continuation.finish() } Task { // Add the ndb streaming task to the task manager so that it can be cancelled when the app is backgrounded let ndbStreamTaskId = await self.taskManager.add(task: ndbStreamTask) - let streamTaskId = await self.taskManager.add(task: streamTask) continuation.onTermination = { @Sendable _ in Task { await self.taskManager.cancelAndCleanUp(taskId: ndbStreamTaskId) - await self.taskManager.cancelAndCleanUp(taskId: streamTaskId) } } } diff --git a/damus/Core/Nostr/RelayPool.swift b/damus/Core/Nostr/RelayPool.swift index 31a0f65f..96226d16 100644 --- a/damus/Core/Nostr/RelayPool.swift +++ b/damus/Core/Nostr/RelayPool.swift @@ -444,6 +444,7 @@ class RelayPool { continue } + Log.debug("%s: Sending resubscribe request to %s", for: .networking, handler.sub_id, relayId.absoluteString) send(.subscribe(.init(filters: filters, sub_id: handler.sub_id)), to: [relayId], skip_ephemeral: shouldSkipEphemeralRelays) } }