From 7afcaa99fe135815fb5bb92d7d1ee9d185f5f7c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20D=E2=80=99Aquino?= Date: Fri, 24 Oct 2025 16:22:26 -0700 Subject: [PATCH] Reduce race condition probability in Ndb streaming functions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This attempts to reduce race conditions coming from Ndb streaming functions that could lead to lost notes or crashes. It does so by making two improvements: 1. Instead of callbacks, now the callback handler uses async streams, which reduces the chances of a callback being called before the last item was processed by the consumer. 2. The callback handler will now queue up received notes if there are no listeners yet. This is helpful because we need to issue the subscribe call to nostrdb before getting the subscription id and setting up a listener, but in between that time nostrdb may still send notes which would effectively get dropped without this queuing mechanism. Changelog-Fixed: Improved robustness in the part of the code that streams notes from nostrdb Signed-off-by: Daniel D’Aquino --- .../NostrNetworkManagerTests.swift | 6 +- nostrdb/Ndb.swift | 85 +++++++++++++------ 2 files changed, 62 insertions(+), 29 deletions(-) diff --git a/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift b/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift index e6593fd4..28cff5ca 100644 --- a/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift +++ b/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift @@ -42,6 +42,7 @@ class NostrNetworkManagerTests: XCTestCase { func ensureSubscribeGetsAllExpectedNotes(filter: NostrFilter, expectedCount: Int) async { let endOfStream = XCTestExpectation(description: "Stream should receive EOSE") + let atLeastXEvents = XCTestExpectation(description: "Stream should get at least the expected number of notes") var receivedCount = 0 var eventIds: Set = [] Task { @@ -55,6 +56,9 @@ class NostrNetworkManagerTests: XCTestCase { } eventIds.insert(event.id) } + if eventIds.count >= expectedCount { + atLeastXEvents.fulfill() + } case .eose: continue case .ndbEose: @@ -66,7 +70,7 @@ class NostrNetworkManagerTests: XCTestCase { } } } - await fulfillment(of: [endOfStream], timeout: 15.0) + await fulfillment(of: [endOfStream, atLeastXEvents], timeout: 15.0) XCTAssertEqual(receivedCount, expectedCount, "Event IDs: \(eventIds.map({ $0.hex() }))") } diff --git a/nostrdb/Ndb.swift b/nostrdb/Ndb.swift index ea3e6ab6..aba61375 100644 --- a/nostrdb/Ndb.swift +++ b/nostrdb/Ndb.swift @@ -653,9 +653,9 @@ class Ndb { /// Safe wrapper around `ndb_subscribe` that handles all pointer management /// - Parameters: /// - filters: Array of NdbFilter objects - /// - Returns: AsyncStream of StreamItem events for new matches only - private func ndbSubscribe(filters: [NdbFilter]) -> AsyncStream { - return AsyncStream { continuation in + /// - Returns: AsyncStream of NoteKey events for new matches only + private func ndbSubscribe(filters: [NdbFilter]) -> AsyncStream { + return AsyncStream { continuation in // Allocate filters pointer - will be deallocated when subscription ends // Cannot use `defer` to deallocate `filtersPointer` because it needs to remain valid for the lifetime of the subscription, which extends beyond this block's scope. let filtersPointer = UnsafeMutablePointer.allocate(capacity: filters.count) @@ -676,7 +676,7 @@ class Ndb { // Clean up resources on early termination if subid != 0 { ndb_unsubscribe(self.ndb.ndb, subid) - Task { await self.unsetCallback(subscriptionId: subid) } + Task { await self.unsetContinuation(subscriptionId: subid) } } filtersPointer.deallocate() } @@ -688,11 +688,11 @@ class Ndb { // Set up subscription subid = ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count)) - // Set the subscription callback + // We are setting the continuation after issuing the subscription call. + // This won't cause lost notes because if any notes get issued before registering + // the continuation they will get queued by `Ndb.CallbackHandler` Task { - await self.setCallback(for: subid, callback: { noteKey in - continuation.yield(.event(noteKey)) - }) + await self.setContinuation(for: subid, continuation: continuation) } // Update termination handler to include subscription cleanup @@ -701,7 +701,7 @@ class Ndb { terminationStarted = true Log.debug("ndb_wait: stream: Terminated early", for: .ndb) streaming = false - Task { await self.unsetCallback(subscriptionId: subid) } + Task { await self.unsetContinuation(subscriptionId: subid) } filtersPointer.deallocate() guard !self.is_closed else { return } // Double-check Ndb is open before sending unsubscribe ndb_unsubscribe(self.ndb.ndb, subid) @@ -737,9 +737,9 @@ class Ndb { // Create a task to forward events from the subscription stream let forwardingTask = Task { - for await item in newEventsStream { - try Task.checkCancellation() - continuation.yield(item) + for await noteKey in newEventsStream { + if Task.isCancelled { break } + continuation.yield(.event(noteKey)) } continuation.finish() } @@ -840,11 +840,11 @@ class Ndb { // MARK: Internal ndb callback interfaces - internal func setCallback(for subscriptionId: UInt64, callback: @escaping (NoteKey) -> Void) async { - await self.callbackHandler.set(callback: callback, for: subscriptionId) + internal func setContinuation(for subscriptionId: UInt64, continuation: AsyncStream.Continuation) async { + await self.callbackHandler.set(continuation: continuation, for: subscriptionId) } - internal func unsetCallback(subscriptionId: UInt64) async { + internal func unsetContinuation(subscriptionId: UInt64) async { await self.callbackHandler.unset(subid: subscriptionId) } @@ -873,31 +873,60 @@ extension Ndb { actor CallbackHandler { /// Holds the ndb instance in the C codebase. Should be shared with `Ndb` var ndb: ndb_t? = nil - /// A map from nostrdb subscription ids to callbacks - var subscriptionCallbackMap: [UInt64: (NoteKey) -> Void] = [:] + /// A map from nostrdb subscription ids to stream continuations, which allows publishing note keys to active listeners + var subscriptionContinuationMap: [UInt64: AsyncStream.Continuation] = [:] + /// A map from nostrdb subscription ids to queued note keys (for when there are no active listeners) + var subscriptionQueueMap: [UInt64: [NoteKey]] = [:] + /// Maximum number of items to queue per subscription when no one is listening + let maxQueueItemsPerSubscription: Int = 2000 - func set(callback: @escaping (NoteKey) -> Void, for subid: UInt64) { - subscriptionCallbackMap[subid] = callback + func set(continuation: AsyncStream.Continuation, for subid: UInt64) { + // Flush any queued items to the new continuation + if let queuedItems = subscriptionQueueMap[subid] { + for noteKey in queuedItems { + continuation.yield(noteKey) + } + subscriptionQueueMap[subid] = nil + } + + subscriptionContinuationMap[subid] = continuation } func unset(subid: UInt64) { - subscriptionCallbackMap[subid] = nil + subscriptionContinuationMap[subid] = nil + subscriptionQueueMap[subid] = nil } func set(ndb: ndb_t?) { self.ndb = ndb } - /// Handles callbacks from nostrdb subscriptions, and routes them to the correct callback + /// Handles callbacks from nostrdb subscriptions, and routes them to the correct continuation or queue func handleSubscriptionCallback(subId: UInt64, maxCapacity: Int32 = 1000) { - if let callback = subscriptionCallbackMap[subId] { - let result = UnsafeMutablePointer.allocate(capacity: Int(maxCapacity)) - defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks - if let ndb { - let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity) - for i in 0...allocate(capacity: Int(maxCapacity)) + defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks + + guard let ndb else { return } + + let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity) + + for i in 0..= maxQueueItemsPerSubscription { + queue.removeFirst() } + + queue.append(noteKey) + subscriptionQueueMap[subId] = queue } } }