diff --git a/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift b/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift index e0084677..431ca8af 100644 --- a/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift +++ b/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift @@ -129,6 +129,7 @@ class NostrNetworkManagerTests: XCTestCase { var count = 0 var receivedIds = Set() let subscribeExpectation = XCTestExpectation(description: "Should receive all events and EOSE") + let atLeastXNotes = XCTestExpectation(description: "Should get at least the expected amount of notes") Task { do { @@ -142,6 +143,9 @@ class NostrNetworkManagerTests: XCTestCase { receivedIds.insert(note.id) } } + if count >= expectedCount { + atLeastXNotes.fulfill() + } case .eose: // End of stored events subscribeExpectation.fulfill() @@ -153,7 +157,7 @@ class NostrNetworkManagerTests: XCTestCase { } } - await fulfillment(of: [subscribeExpectation], timeout: 10.0) + await fulfillment(of: [subscribeExpectation, atLeastXNotes], timeout: 10.0) // Verify we received exactly the expected number of unique events XCTAssertEqual(count, expectedCount, "Should receive all \(expectedCount) events") diff --git a/nostrdb/Ndb.swift b/nostrdb/Ndb.swift index aba61375..e16daaa3 100644 --- a/nostrdb/Ndb.swift +++ b/nostrdb/Ndb.swift @@ -691,7 +691,7 @@ class Ndb { // We are setting the continuation after issuing the subscription call. // This won't cause lost notes because if any notes get issued before registering // the continuation they will get queued by `Ndb.CallbackHandler` - Task { + let continuationSetupTask = Task { await self.setContinuation(for: subid, continuation: continuation) } @@ -701,6 +701,7 @@ class Ndb { terminationStarted = true Log.debug("ndb_wait: stream: Terminated early", for: .ndb) streaming = false + continuationSetupTask.cancel() Task { await self.unsetContinuation(subscriptionId: subid) } filtersPointer.deallocate() guard !self.is_closed else { return } // Double-check Ndb is open before sending unsubscribe @@ -711,19 +712,21 @@ class Ndb { func subscribe(filters: [NdbFilter], maxSimultaneousResults: Int = 1000) throws(NdbStreamError) -> AsyncStream { guard !self.is_closed else { throw .ndbClosed } - // Fetch initial results - guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction } do { try Task.checkCancellation() } catch { throw .cancelled } + // CRITICAL: Create the subscription FIRST before querying to avoid race condition + // This ensures that any events indexed after subscription but before query won't be missed + let newEventsStream = ndbSubscribe(filters: filters) + + // Now fetch initial results after subscription is registered + guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction } + // Use our safe wrapper instead of direct C function call let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults) do { try Task.checkCancellation() } catch { throw .cancelled } - // Create a subscription for new events - let newEventsStream = ndbSubscribe(filters: filters) - // Create a cascading stream that combines initial results with new events return AsyncStream { continuation in // Stream all results already present in the database