Reduce race condition probability in Ndb streaming functions

This attempts to reduce race conditions coming from Ndb streaming
functions that could lead to lost notes or crashes.

It does so by making two improvements:
1. Instead of callbacks, now the callback handler uses async streams,
   which reduces the chances of a callback being called before the last
   item was processed by the consumer.
2. The callback handler will now queue up received notes if there are
   no listeners yet. This is helpful because we need to issue the
   subscribe call to nostrdb before getting the subscription id and
   setting up a listener, but in between that time nostrdb may still
   send notes which would effectively get dropped without this queuing
   mechanism.

Changelog-Fixed: Improved robustness in the part of the code that streams notes from nostrdb
Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-10-24 16:22:26 -07:00
parent 10b4d804f8
commit 7afcaa99fe
2 changed files with 62 additions and 29 deletions

View File

@@ -42,6 +42,7 @@ class NostrNetworkManagerTests: XCTestCase {
func ensureSubscribeGetsAllExpectedNotes(filter: NostrFilter, expectedCount: Int) async {
let endOfStream = XCTestExpectation(description: "Stream should receive EOSE")
let atLeastXEvents = XCTestExpectation(description: "Stream should get at least the expected number of notes")
var receivedCount = 0
var eventIds: Set<NoteId> = []
Task {
@@ -55,6 +56,9 @@ class NostrNetworkManagerTests: XCTestCase {
}
eventIds.insert(event.id)
}
if eventIds.count >= expectedCount {
atLeastXEvents.fulfill()
}
case .eose:
continue
case .ndbEose:
@@ -66,7 +70,7 @@ class NostrNetworkManagerTests: XCTestCase {
}
}
}
await fulfillment(of: [endOfStream], timeout: 15.0)
await fulfillment(of: [endOfStream, atLeastXEvents], timeout: 15.0)
XCTAssertEqual(receivedCount, expectedCount, "Event IDs: \(eventIds.map({ $0.hex() }))")
}

View File

@@ -653,9 +653,9 @@ class Ndb {
/// Safe wrapper around `ndb_subscribe` that handles all pointer management
/// - Parameters:
/// - filters: Array of NdbFilter objects
/// - Returns: AsyncStream of StreamItem events for new matches only
private func ndbSubscribe(filters: [NdbFilter]) -> AsyncStream<StreamItem> {
return AsyncStream<StreamItem> { continuation in
/// - Returns: AsyncStream of NoteKey events for new matches only
private func ndbSubscribe(filters: [NdbFilter]) -> AsyncStream<NoteKey> {
return AsyncStream<NoteKey> { continuation in
// Allocate filters pointer - will be deallocated when subscription ends
// Cannot use `defer` to deallocate `filtersPointer` because it needs to remain valid for the lifetime of the subscription, which extends beyond this block's scope.
let filtersPointer = UnsafeMutablePointer<ndb_filter>.allocate(capacity: filters.count)
@@ -676,7 +676,7 @@ class Ndb {
// Clean up resources on early termination
if subid != 0 {
ndb_unsubscribe(self.ndb.ndb, subid)
Task { await self.unsetCallback(subscriptionId: subid) }
Task { await self.unsetContinuation(subscriptionId: subid) }
}
filtersPointer.deallocate()
}
@@ -688,11 +688,11 @@ class Ndb {
// Set up subscription
subid = ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count))
// Set the subscription callback
// We are setting the continuation after issuing the subscription call.
// This won't cause lost notes because if any notes get issued before registering
// the continuation they will get queued by `Ndb.CallbackHandler`
Task {
await self.setCallback(for: subid, callback: { noteKey in
continuation.yield(.event(noteKey))
})
await self.setContinuation(for: subid, continuation: continuation)
}
// Update termination handler to include subscription cleanup
@@ -701,7 +701,7 @@ class Ndb {
terminationStarted = true
Log.debug("ndb_wait: stream: Terminated early", for: .ndb)
streaming = false
Task { await self.unsetCallback(subscriptionId: subid) }
Task { await self.unsetContinuation(subscriptionId: subid) }
filtersPointer.deallocate()
guard !self.is_closed else { return } // Double-check Ndb is open before sending unsubscribe
ndb_unsubscribe(self.ndb.ndb, subid)
@@ -737,9 +737,9 @@ class Ndb {
// Create a task to forward events from the subscription stream
let forwardingTask = Task {
for await item in newEventsStream {
try Task.checkCancellation()
continuation.yield(item)
for await noteKey in newEventsStream {
if Task.isCancelled { break }
continuation.yield(.event(noteKey))
}
continuation.finish()
}
@@ -840,11 +840,11 @@ class Ndb {
// MARK: Internal ndb callback interfaces
internal func setCallback(for subscriptionId: UInt64, callback: @escaping (NoteKey) -> Void) async {
await self.callbackHandler.set(callback: callback, for: subscriptionId)
internal func setContinuation(for subscriptionId: UInt64, continuation: AsyncStream<NoteKey>.Continuation) async {
await self.callbackHandler.set(continuation: continuation, for: subscriptionId)
}
internal func unsetCallback(subscriptionId: UInt64) async {
internal func unsetContinuation(subscriptionId: UInt64) async {
await self.callbackHandler.unset(subid: subscriptionId)
}
@@ -873,31 +873,60 @@ extension Ndb {
actor CallbackHandler {
/// Holds the ndb instance in the C codebase. Should be shared with `Ndb`
var ndb: ndb_t? = nil
/// A map from nostrdb subscription ids to callbacks
var subscriptionCallbackMap: [UInt64: (NoteKey) -> Void] = [:]
/// A map from nostrdb subscription ids to stream continuations, which allows publishing note keys to active listeners
var subscriptionContinuationMap: [UInt64: AsyncStream<NoteKey>.Continuation] = [:]
/// A map from nostrdb subscription ids to queued note keys (for when there are no active listeners)
var subscriptionQueueMap: [UInt64: [NoteKey]] = [:]
/// Maximum number of items to queue per subscription when no one is listening
let maxQueueItemsPerSubscription: Int = 2000
func set(callback: @escaping (NoteKey) -> Void, for subid: UInt64) {
subscriptionCallbackMap[subid] = callback
func set(continuation: AsyncStream<NoteKey>.Continuation, for subid: UInt64) {
// Flush any queued items to the new continuation
if let queuedItems = subscriptionQueueMap[subid] {
for noteKey in queuedItems {
continuation.yield(noteKey)
}
subscriptionQueueMap[subid] = nil
}
subscriptionContinuationMap[subid] = continuation
}
func unset(subid: UInt64) {
subscriptionCallbackMap[subid] = nil
subscriptionContinuationMap[subid] = nil
subscriptionQueueMap[subid] = nil
}
func set(ndb: ndb_t?) {
self.ndb = ndb
}
/// Handles callbacks from nostrdb subscriptions, and routes them to the correct callback
/// Handles callbacks from nostrdb subscriptions, and routes them to the correct continuation or queue
func handleSubscriptionCallback(subId: UInt64, maxCapacity: Int32 = 1000) {
if let callback = subscriptionCallbackMap[subId] {
let result = UnsafeMutablePointer<UInt64>.allocate(capacity: Int(maxCapacity))
defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks
if let ndb {
let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity)
for i in 0..<numberOfNotes {
callback(result.advanced(by: Int(i)).pointee)
let result = UnsafeMutablePointer<UInt64>.allocate(capacity: Int(maxCapacity))
defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks
guard let ndb else { return }
let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity)
for i in 0..<numberOfNotes {
let noteKey = result.advanced(by: Int(i)).pointee
if let continuation = subscriptionContinuationMap[subId] {
// Send directly to the active listener stream
continuation.yield(noteKey)
} else {
// No one is listening, queue it for later
var queue = subscriptionQueueMap[subId] ?? []
// Ensure queue stays within the desired size
while queue.count >= maxQueueItemsPerSubscription {
queue.removeFirst()
}
queue.append(noteKey)
subscriptionQueueMap[subId] = queue
}
}
}