diff --git a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift index aaf0d0c7..1fdc7ba5 100644 --- a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift @@ -112,64 +112,7 @@ extension NostrNetworkManager { } } - /// Subscribes to data from the user's relays func advancedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { - return AsyncStream { continuation in - let subscriptionId = id ?? UUID() - let startTime = CFAbsoluteTimeGetCurrent() - Self.logger.info("Starting subscription \(subscriptionId.uuidString, privacy: .public): \(filters.debugDescription, privacy: .private)") - let multiSessionStreamingTask = Task { - while !Task.isCancelled { - do { - guard !self.ndb.is_closed else { - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Ndb closed. Sleeping for 1 second before resuming.") - try await Task.sleep(nanoseconds: 1_000_000_000) - continue - } - guard self.pool.open else { - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): RelayPool closed. Sleeping for 1 second before resuming.") - try await Task.sleep(nanoseconds: 1_000_000_000) - continue - } - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Streaming.") - for await item in self.sessionSubscribe(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { - try Task.checkCancellation() - continuation.yield(item) - } - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Session subscription ended. Sleeping for 1 second before resuming.") - try await Task.sleep(nanoseconds: 1_000_000_000) - } - catch { - Self.logger.error("Session subscription \(subscriptionId.uuidString, privacy: .public): Error: \(error.localizedDescription, privacy: .public)") - } - } - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Terminated.") - } - let timeoutTask = Task { - if let timeout { - try await Task.sleep(for: timeout) - continuation.finish() // End the stream due to timeout. - } - } - continuation.onTermination = { @Sendable _ in - Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Cancelled.") - multiSessionStreamingTask.cancel() - timeoutTask.cancel() - } - } - } - - /// Subscribes to data from the user's relays - /// - /// Only survives for a single session. This exits after the app is backgrounded - /// - /// ## Implementation notes - /// - /// - When we migrate to the local relay model, we should modify this function to stream directly from NostrDB - /// - /// - Parameter filters: The nostr filters to specify what kind of data to subscribe to - /// - Returns: An async stream of nostr data - private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { let id = id ?? UUID() let streamMode = streamMode ?? defaultStreamMode() return AsyncStream { continuation in @@ -194,36 +137,66 @@ extension NostrNetworkManager { } } - let ndbStreamTask = Task { - do { - for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) { + let streamTask = Task { + while !Task.isCancelled { + for await item in self.multiSessionNetworkStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { try Task.checkCancellation() switch item { + case .event(let lender): + continuation.yield(item) case .eose: - Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from nostrdb. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds") - continuation.yield(.ndbEose) - ndbEOSEIssued = true + break // Should not happen + case .ndbEose: + break // Should not happen + case .networkEose: + continuation.yield(item) + networkEOSEIssued = true yieldEOSEIfReady() - case .event(let noteKey): - let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey) - try Task.checkCancellation() - guard let desiredRelays else { - continuation.yield(.event(lender: lender)) // If no desired relays are specified, return all notes we see. - break - } - if try ndb.was(noteKey: noteKey, seenOnAnyOf: desiredRelays) { - continuation.yield(.event(lender: lender)) // If desired relays were specified and this note was seen there, return it. - } } } } - catch { - Self.logger.error("Session subscription \(id.uuidString, privacy: .public): NDB streaming error: \(error.localizedDescription, privacy: .public)") - } - Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): NDB streaming ended") - continuation.finish() } + + let ndbStreamTask = Task { + while !Task.isCancelled { + for await item in self.multiSessionNdbStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { + try Task.checkCancellation() + switch item { + case .event(let lender): + continuation.yield(item) + case .eose: + break // Should not happen + case .ndbEose: + continuation.yield(item) + ndbEOSEIssued = true + yieldEOSEIfReady() + case .networkEose: + break // Should not happen + } + } + } + } + + continuation.onTermination = { @Sendable _ in + streamTask.cancel() + ndbStreamTask.cancel() + } + } + } + + private func multiSessionNetworkStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { + let id = id ?? UUID() + let streamMode = streamMode ?? defaultStreamMode() + return AsyncStream { continuation in + let startTime = CFAbsoluteTimeGetCurrent() + Self.logger.debug("Network subscription \(id.uuidString, privacy: .public): Started") + let streamTask = Task { + while !self.pool.open { + Self.logger.info("\(id.uuidString, privacy: .public): RelayPool closed. Sleeping for 1 second before resuming.") + try await Task.sleep(nanoseconds: 1_000_000_000) + continue + } do { for await item in self.pool.subscribe(filters: filters, to: desiredRelays, id: id) { // NO-OP. Notes will be automatically ingested by NostrDB @@ -243,27 +216,98 @@ extension NostrNetworkManager { case .eose: Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from the network. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds") continuation.yield(.networkEose) - networkEOSEIssued = true - yieldEOSEIfReady() } } } catch { - Self.logger.error("Session subscription \(id.uuidString, privacy: .public): Network streaming error: \(error.localizedDescription, privacy: .public)") + Self.logger.error("Network subscription \(id.uuidString, privacy: .public): Streaming error: \(error.localizedDescription, privacy: .public)") } - Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Network streaming ended") + Self.logger.debug("Network subscription \(id.uuidString, privacy: .public): Network streaming ended") + continuation.finish() + } + + continuation.onTermination = { @Sendable _ in + streamTask.cancel() + } + } + } + + private func multiSessionNdbStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { + return AsyncStream { continuation in + let subscriptionId = id ?? UUID() + let startTime = CFAbsoluteTimeGetCurrent() + Self.logger.info("Starting multi-session NDB subscription \(subscriptionId.uuidString, privacy: .public): \(filters.debugDescription, privacy: .private)") + let multiSessionStreamingTask = Task { + while !Task.isCancelled { + do { + guard !self.ndb.is_closed else { + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Ndb closed. Sleeping for 1 second before resuming.") + try await Task.sleep(nanoseconds: 1_000_000_000) + continue + } + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Streaming from NDB.") + for await item in self.sessionNdbStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { + try Task.checkCancellation() + continuation.yield(item) + } + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Session subscription ended. Sleeping for 1 second before resuming.") + try await Task.sleep(nanoseconds: 1_000_000_000) + } + catch { + Self.logger.error("Session subscription \(subscriptionId.uuidString, privacy: .public): Error: \(error.localizedDescription, privacy: .public)") + } + } + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Terminated.") + } + continuation.onTermination = { @Sendable _ in + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Cancelled multi-session NDB stream.") + multiSessionStreamingTask.cancel() + } + } + } + + private func sessionNdbStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { + let id = id ?? UUID() + //let streamMode = streamMode ?? defaultStreamMode() + return AsyncStream { continuation in + let startTime = CFAbsoluteTimeGetCurrent() + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Started") + + let ndbStreamTask = Task { + do { + for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) { + try Task.checkCancellation() + switch item { + case .eose: + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from nostrdb. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds") + continuation.yield(.ndbEose) + case .event(let noteKey): + let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey) + try Task.checkCancellation() + guard let desiredRelays else { + continuation.yield(.event(lender: lender)) // If no desired relays are specified, return all notes we see. + break + } + if try ndb.was(noteKey: noteKey, seenOnAnyOf: desiredRelays) { + continuation.yield(.event(lender: lender)) // If desired relays were specified and this note was seen there, return it. + } + } + } + } + catch { + Self.logger.error("Session subscription \(id.uuidString, privacy: .public): NDB streaming error: \(error.localizedDescription, privacy: .public)") + } + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): NDB streaming ended") continuation.finish() } Task { // Add the ndb streaming task to the task manager so that it can be cancelled when the app is backgrounded let ndbStreamTaskId = await self.taskManager.add(task: ndbStreamTask) - let streamTaskId = await self.taskManager.add(task: streamTask) continuation.onTermination = { @Sendable _ in Task { await self.taskManager.cancelAndCleanUp(taskId: ndbStreamTaskId) - await self.taskManager.cancelAndCleanUp(taskId: streamTaskId) } } } diff --git a/damus/Core/Nostr/RelayPool.swift b/damus/Core/Nostr/RelayPool.swift index 31a0f65f..96226d16 100644 --- a/damus/Core/Nostr/RelayPool.swift +++ b/damus/Core/Nostr/RelayPool.swift @@ -444,6 +444,7 @@ class RelayPool { continue } + Log.debug("%s: Sending resubscribe request to %s", for: .networking, handler.sub_id, relayId.absoluteString) send(.subscribe(.init(filters: filters, sub_id: handler.sub_id)), to: [relayId], skip_ephemeral: shouldSkipEphemeralRelays) } }