Fix race condition leading to intermittent issues with ndb streaming and related tests
A race condition was identified where notes would get dropped if they get indexed in the time window between when a query is made and the subscription is made. The issue was fixed by making the subscribe call before making the query call, to ensure we get all notes from that time when we perform the query. This dropped the failure rate for ndb subscription tests from about 20% down to about 4%. Local relay model issue was not publicly released, which is why the changelog entry is "none". Changelog-None Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
@@ -129,6 +129,7 @@ class NostrNetworkManagerTests: XCTestCase {
|
|||||||
var count = 0
|
var count = 0
|
||||||
var receivedIds = Set<NoteId>()
|
var receivedIds = Set<NoteId>()
|
||||||
let subscribeExpectation = XCTestExpectation(description: "Should receive all events and EOSE")
|
let subscribeExpectation = XCTestExpectation(description: "Should receive all events and EOSE")
|
||||||
|
let atLeastXNotes = XCTestExpectation(description: "Should get at least the expected amount of notes")
|
||||||
|
|
||||||
Task {
|
Task {
|
||||||
do {
|
do {
|
||||||
@@ -142,6 +143,9 @@ class NostrNetworkManagerTests: XCTestCase {
|
|||||||
receivedIds.insert(note.id)
|
receivedIds.insert(note.id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if count >= expectedCount {
|
||||||
|
atLeastXNotes.fulfill()
|
||||||
|
}
|
||||||
case .eose:
|
case .eose:
|
||||||
// End of stored events
|
// End of stored events
|
||||||
subscribeExpectation.fulfill()
|
subscribeExpectation.fulfill()
|
||||||
@@ -153,7 +157,7 @@ class NostrNetworkManagerTests: XCTestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await fulfillment(of: [subscribeExpectation], timeout: 10.0)
|
await fulfillment(of: [subscribeExpectation, atLeastXNotes], timeout: 10.0)
|
||||||
|
|
||||||
// Verify we received exactly the expected number of unique events
|
// Verify we received exactly the expected number of unique events
|
||||||
XCTAssertEqual(count, expectedCount, "Should receive all \(expectedCount) events")
|
XCTAssertEqual(count, expectedCount, "Should receive all \(expectedCount) events")
|
||||||
|
|||||||
@@ -691,7 +691,7 @@ class Ndb {
|
|||||||
// We are setting the continuation after issuing the subscription call.
|
// We are setting the continuation after issuing the subscription call.
|
||||||
// This won't cause lost notes because if any notes get issued before registering
|
// This won't cause lost notes because if any notes get issued before registering
|
||||||
// the continuation they will get queued by `Ndb.CallbackHandler`
|
// the continuation they will get queued by `Ndb.CallbackHandler`
|
||||||
Task {
|
let continuationSetupTask = Task {
|
||||||
await self.setContinuation(for: subid, continuation: continuation)
|
await self.setContinuation(for: subid, continuation: continuation)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -701,6 +701,7 @@ class Ndb {
|
|||||||
terminationStarted = true
|
terminationStarted = true
|
||||||
Log.debug("ndb_wait: stream: Terminated early", for: .ndb)
|
Log.debug("ndb_wait: stream: Terminated early", for: .ndb)
|
||||||
streaming = false
|
streaming = false
|
||||||
|
continuationSetupTask.cancel()
|
||||||
Task { await self.unsetContinuation(subscriptionId: subid) }
|
Task { await self.unsetContinuation(subscriptionId: subid) }
|
||||||
filtersPointer.deallocate()
|
filtersPointer.deallocate()
|
||||||
guard !self.is_closed else { return } // Double-check Ndb is open before sending unsubscribe
|
guard !self.is_closed else { return } // Double-check Ndb is open before sending unsubscribe
|
||||||
@@ -711,19 +712,21 @@ class Ndb {
|
|||||||
|
|
||||||
func subscribe(filters: [NdbFilter], maxSimultaneousResults: Int = 1000) throws(NdbStreamError) -> AsyncStream<StreamItem> {
|
func subscribe(filters: [NdbFilter], maxSimultaneousResults: Int = 1000) throws(NdbStreamError) -> AsyncStream<StreamItem> {
|
||||||
guard !self.is_closed else { throw .ndbClosed }
|
guard !self.is_closed else { throw .ndbClosed }
|
||||||
// Fetch initial results
|
|
||||||
guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction }
|
|
||||||
|
|
||||||
do { try Task.checkCancellation() } catch { throw .cancelled }
|
do { try Task.checkCancellation() } catch { throw .cancelled }
|
||||||
|
|
||||||
|
// CRITICAL: Create the subscription FIRST before querying to avoid race condition
|
||||||
|
// This ensures that any events indexed after subscription but before query won't be missed
|
||||||
|
let newEventsStream = ndbSubscribe(filters: filters)
|
||||||
|
|
||||||
|
// Now fetch initial results after subscription is registered
|
||||||
|
guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction }
|
||||||
|
|
||||||
// Use our safe wrapper instead of direct C function call
|
// Use our safe wrapper instead of direct C function call
|
||||||
let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults)
|
let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults)
|
||||||
|
|
||||||
do { try Task.checkCancellation() } catch { throw .cancelled }
|
do { try Task.checkCancellation() } catch { throw .cancelled }
|
||||||
|
|
||||||
// Create a subscription for new events
|
|
||||||
let newEventsStream = ndbSubscribe(filters: filters)
|
|
||||||
|
|
||||||
// Create a cascading stream that combines initial results with new events
|
// Create a cascading stream that combines initial results with new events
|
||||||
return AsyncStream<StreamItem> { continuation in
|
return AsyncStream<StreamItem> { continuation in
|
||||||
// Stream all results already present in the database
|
// Stream all results already present in the database
|
||||||
|
|||||||
Reference in New Issue
Block a user