diff --git a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift index 9de44e8b..d9c6a236 100644 --- a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift @@ -32,17 +32,14 @@ extension NostrNetworkManager { /// - Returns: An async stream of nostr data func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream { return AsyncStream { continuation in - let streamTask = Task { - for await item in self.pool.subscribe(filters: filters, to: desiredRelays) { + let ndbStreamTask = Task { + for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) { switch item { - case .eose: continuation.yield(.eose) - case .event(let nostrEvent): - // At this point of the pipeline, if the note is valid it should have been processed and verified by NostrDB, - // in which case we should pull the note from NostrDB to ensure validity. - // However, NdbNotes are unowned, so we return a function where our callers can temporarily borrow the NostrDB note - let noteId = nostrEvent.id + case .eose: + continuation.yield(.eose) + case .event(let noteKey): let lender: NdbNoteLender = { lend in - guard let ndbNoteTxn = self.ndb.lookup_note(noteId) else { + guard let ndbNoteTxn = self.ndb.lookup_note_by_key(noteKey) else { throw NdbNoteLenderError.errorLoadingNote } guard let unownedNote = UnownedNdbNote(ndbNoteTxn) else { @@ -54,8 +51,15 @@ extension NostrNetworkManager { } } } + let streamTask = Task { + for await _ in self.pool.subscribe(filters: filters, to: desiredRelays) { + // NO-OP. Notes will be automatically ingested by NostrDB + // TODO: Improve efficiency of subscriptions? + } + } continuation.onTermination = { @Sendable _ in streamTask.cancel() // Close the RelayPool stream when caller stops streaming + ndbStreamTask.cancel() } } }