diff --git a/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift b/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift index e6593fd4..28cff5ca 100644 --- a/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift +++ b/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift @@ -42,6 +42,7 @@ class NostrNetworkManagerTests: XCTestCase { func ensureSubscribeGetsAllExpectedNotes(filter: NostrFilter, expectedCount: Int) async { let endOfStream = XCTestExpectation(description: "Stream should receive EOSE") + let atLeastXEvents = XCTestExpectation(description: "Stream should get at least the expected number of notes") var receivedCount = 0 var eventIds: Set = [] Task { @@ -55,6 +56,9 @@ class NostrNetworkManagerTests: XCTestCase { } eventIds.insert(event.id) } + if eventIds.count >= expectedCount { + atLeastXEvents.fulfill() + } case .eose: continue case .ndbEose: @@ -66,7 +70,7 @@ class NostrNetworkManagerTests: XCTestCase { } } } - await fulfillment(of: [endOfStream], timeout: 15.0) + await fulfillment(of: [endOfStream, atLeastXEvents], timeout: 15.0) XCTAssertEqual(receivedCount, expectedCount, "Event IDs: \(eventIds.map({ $0.hex() }))") } diff --git a/nostrdb/Ndb.swift b/nostrdb/Ndb.swift index ea3e6ab6..aba61375 100644 --- a/nostrdb/Ndb.swift +++ b/nostrdb/Ndb.swift @@ -653,9 +653,9 @@ class Ndb { /// Safe wrapper around `ndb_subscribe` that handles all pointer management /// - Parameters: /// - filters: Array of NdbFilter objects - /// - Returns: AsyncStream of StreamItem events for new matches only - private func ndbSubscribe(filters: [NdbFilter]) -> AsyncStream { - return AsyncStream { continuation in + /// - Returns: AsyncStream of NoteKey events for new matches only + private func ndbSubscribe(filters: [NdbFilter]) -> AsyncStream { + return AsyncStream { continuation in // Allocate filters pointer - will be deallocated when subscription ends // Cannot use `defer` to deallocate `filtersPointer` because it needs to remain valid for the lifetime of the subscription, which extends beyond this block's scope. let filtersPointer = UnsafeMutablePointer.allocate(capacity: filters.count) @@ -676,7 +676,7 @@ class Ndb { // Clean up resources on early termination if subid != 0 { ndb_unsubscribe(self.ndb.ndb, subid) - Task { await self.unsetCallback(subscriptionId: subid) } + Task { await self.unsetContinuation(subscriptionId: subid) } } filtersPointer.deallocate() } @@ -688,11 +688,11 @@ class Ndb { // Set up subscription subid = ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count)) - // Set the subscription callback + // We are setting the continuation after issuing the subscription call. + // This won't cause lost notes because if any notes get issued before registering + // the continuation they will get queued by `Ndb.CallbackHandler` Task { - await self.setCallback(for: subid, callback: { noteKey in - continuation.yield(.event(noteKey)) - }) + await self.setContinuation(for: subid, continuation: continuation) } // Update termination handler to include subscription cleanup @@ -701,7 +701,7 @@ class Ndb { terminationStarted = true Log.debug("ndb_wait: stream: Terminated early", for: .ndb) streaming = false - Task { await self.unsetCallback(subscriptionId: subid) } + Task { await self.unsetContinuation(subscriptionId: subid) } filtersPointer.deallocate() guard !self.is_closed else { return } // Double-check Ndb is open before sending unsubscribe ndb_unsubscribe(self.ndb.ndb, subid) @@ -737,9 +737,9 @@ class Ndb { // Create a task to forward events from the subscription stream let forwardingTask = Task { - for await item in newEventsStream { - try Task.checkCancellation() - continuation.yield(item) + for await noteKey in newEventsStream { + if Task.isCancelled { break } + continuation.yield(.event(noteKey)) } continuation.finish() } @@ -840,11 +840,11 @@ class Ndb { // MARK: Internal ndb callback interfaces - internal func setCallback(for subscriptionId: UInt64, callback: @escaping (NoteKey) -> Void) async { - await self.callbackHandler.set(callback: callback, for: subscriptionId) + internal func setContinuation(for subscriptionId: UInt64, continuation: AsyncStream.Continuation) async { + await self.callbackHandler.set(continuation: continuation, for: subscriptionId) } - internal func unsetCallback(subscriptionId: UInt64) async { + internal func unsetContinuation(subscriptionId: UInt64) async { await self.callbackHandler.unset(subid: subscriptionId) } @@ -873,31 +873,60 @@ extension Ndb { actor CallbackHandler { /// Holds the ndb instance in the C codebase. Should be shared with `Ndb` var ndb: ndb_t? = nil - /// A map from nostrdb subscription ids to callbacks - var subscriptionCallbackMap: [UInt64: (NoteKey) -> Void] = [:] + /// A map from nostrdb subscription ids to stream continuations, which allows publishing note keys to active listeners + var subscriptionContinuationMap: [UInt64: AsyncStream.Continuation] = [:] + /// A map from nostrdb subscription ids to queued note keys (for when there are no active listeners) + var subscriptionQueueMap: [UInt64: [NoteKey]] = [:] + /// Maximum number of items to queue per subscription when no one is listening + let maxQueueItemsPerSubscription: Int = 2000 - func set(callback: @escaping (NoteKey) -> Void, for subid: UInt64) { - subscriptionCallbackMap[subid] = callback + func set(continuation: AsyncStream.Continuation, for subid: UInt64) { + // Flush any queued items to the new continuation + if let queuedItems = subscriptionQueueMap[subid] { + for noteKey in queuedItems { + continuation.yield(noteKey) + } + subscriptionQueueMap[subid] = nil + } + + subscriptionContinuationMap[subid] = continuation } func unset(subid: UInt64) { - subscriptionCallbackMap[subid] = nil + subscriptionContinuationMap[subid] = nil + subscriptionQueueMap[subid] = nil } func set(ndb: ndb_t?) { self.ndb = ndb } - /// Handles callbacks from nostrdb subscriptions, and routes them to the correct callback + /// Handles callbacks from nostrdb subscriptions, and routes them to the correct continuation or queue func handleSubscriptionCallback(subId: UInt64, maxCapacity: Int32 = 1000) { - if let callback = subscriptionCallbackMap[subId] { - let result = UnsafeMutablePointer.allocate(capacity: Int(maxCapacity)) - defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks - if let ndb { - let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity) - for i in 0...allocate(capacity: Int(maxCapacity)) + defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks + + guard let ndb else { return } + + let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity) + + for i in 0..= maxQueueItemsPerSubscription { + queue.removeFirst() } + + queue.append(noteKey) + subscriptionQueueMap[subId] = queue } } }