Improve task cancellation management in SubscriptionManager

The widespread usage of the SubscriptionManager caused new crashes to
occur when swapping apps.

This was caused due to an access to Ndb memory after Ndb has been closed
from the app background signal.

The issue was fixed with improved task management logic and ensuring all
subscription tasks are finished before closing Ndb.

Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-08-27 12:10:36 -07:00
parent 76b6d5c545
commit a5aff15491
6 changed files with 72 additions and 10 deletions

View File

@@ -512,6 +512,7 @@ struct ContentView: View {
case .background: case .background:
print("txn: 📙 DAMUS BACKGROUNDED") print("txn: 📙 DAMUS BACKGROUNDED")
Task { @MainActor in Task { @MainActor in
await damus_state.nostrNetwork.close() // Close ndb streaming tasks before closing ndb to avoid memory errors
damus_state.ndb.close() damus_state.ndb.close()
} }
break break

View File

@@ -234,7 +234,8 @@ class NostrNetworkManager {
// MARK: - App lifecycle functions // MARK: - App lifecycle functions
func close() { func close() async {
await self.reader.cancelAllTasks()
pool.close() pool.close()
} }
} }

View File

@@ -4,6 +4,7 @@
// //
// Created by Daniel DAquino on 2025-03-25. // Created by Daniel DAquino on 2025-03-25.
// //
import Foundation
extension NostrNetworkManager { extension NostrNetworkManager {
/// Reads or fetches information from RelayPool and NostrDB, and provides an easier and unified higher-level interface. /// Reads or fetches information from RelayPool and NostrDB, and provides an easier and unified higher-level interface.
@@ -14,10 +15,12 @@ extension NostrNetworkManager {
class SubscriptionManager { class SubscriptionManager {
private let pool: RelayPool private let pool: RelayPool
private var ndb: Ndb private var ndb: Ndb
private var taskManager: TaskManager
init(pool: RelayPool, ndb: Ndb) { init(pool: RelayPool, ndb: Ndb) {
self.pool = pool self.pool = pool
self.ndb = ndb self.ndb = ndb
self.taskManager = TaskManager()
} }
// MARK: - Reading data from Nostr // MARK: - Reading data from Nostr
@@ -35,6 +38,7 @@ extension NostrNetworkManager {
let ndbStreamTask = Task { let ndbStreamTask = Task {
do { do {
for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) { for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) {
try Task.checkCancellation()
switch item { switch item {
case .eose: case .eose:
continuation.yield(.eose) continuation.yield(.eose)
@@ -48,24 +52,71 @@ extension NostrNetworkManager {
} }
lend(unownedNote) lend(unownedNote)
} }
try Task.checkCancellation()
continuation.yield(.event(borrow: lender)) continuation.yield(.event(borrow: lender))
} }
} }
} }
catch { catch {
Log.error("NDB streaming error: %s", for: .ndb, error.localizedDescription) Log.error("NDB streaming error: %s", for: .subscription_manager, error.localizedDescription)
} }
continuation.finish()
} }
let streamTask = Task { let streamTask = Task {
for await _ in self.pool.subscribe(filters: filters, to: desiredRelays) { do {
// NO-OP. Notes will be automatically ingested by NostrDB for await _ in self.pool.subscribe(filters: filters, to: desiredRelays) {
// TODO: Improve efficiency of subscriptions? // NO-OP. Notes will be automatically ingested by NostrDB
// TODO: Improve efficiency of subscriptions?
try Task.checkCancellation()
}
}
catch {
Log.error("Network streaming error: %s", for: .subscription_manager, error.localizedDescription)
}
continuation.finish()
}
Task {
let ndbStreamTaskId = await self.taskManager.add(task: ndbStreamTask)
let streamTaskId = await self.taskManager.add(task: streamTask)
continuation.onTermination = { @Sendable _ in
Task {
await self.taskManager.cancelAndCleanUp(taskId: ndbStreamTaskId)
await self.taskManager.cancelAndCleanUp(taskId: streamTaskId)
}
} }
} }
continuation.onTermination = { @Sendable _ in }
streamTask.cancel() // Close the RelayPool stream when caller stops streaming }
ndbStreamTask.cancel()
func cancelAllTasks() async {
await self.taskManager.cancelAllTasks()
}
actor TaskManager {
private var tasks: [UUID: Task<Void, Never>] = [:]
func add(task: Task<Void, Never>) -> UUID {
let taskId = UUID()
self.tasks[taskId] = task
return taskId
}
func cancelAndCleanUp(taskId: UUID) async {
self.tasks[taskId]?.cancel()
await self.tasks[taskId]?.value
self.tasks[taskId] = nil
return
}
func cancelAllTasks() async {
Log.info("Cancelling all SubscriptionManager tasks", for: .subscription_manager)
for (taskId, _) in self.tasks {
Log.info("Cancelling SubscriptionManager task %s", for: .subscription_manager, taskId.uuidString)
await cancelAndCleanUp(taskId: taskId)
} }
Log.info("Cancelled all SubscriptionManager tasks", for: .subscription_manager)
} }
} }
} }

View File

@@ -164,8 +164,10 @@ class DamusState: HeadlessDamusState {
try await self.push_notification_client.revoke_token() try await self.push_notification_client.revoke_token()
} }
wallet.disconnect() wallet.disconnect()
nostrNetwork.close() Task {
ndb.close() await nostrNetwork.close() // Close ndb streaming tasks before closing ndb to avoid memory errors
ndb.close()
}
} }
static var empty: DamusState { static var empty: DamusState {

View File

@@ -14,6 +14,7 @@ enum LogCategory: String {
case render case render
case storage case storage
case networking case networking
case subscription_manager
case timeline case timeline
/// Logs related to Nostr Wallet Connect components /// Logs related to Nostr Wallet Connect components
case nwc case nwc

View File

@@ -698,9 +698,13 @@ class Ndb {
// Fetch initial results // Fetch initial results
guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction } guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction }
do { try Task.checkCancellation() } catch { throw .cancelled }
// Use our safe wrapper instead of direct C function call // Use our safe wrapper instead of direct C function call
let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults) let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults)
do { try Task.checkCancellation() } catch { throw .cancelled }
// Create a subscription for new events // Create a subscription for new events
let newEventsStream = ndbSubscribe(filters: filters) let newEventsStream = ndbSubscribe(filters: filters)
@@ -717,6 +721,7 @@ class Ndb {
// Create a task to forward events from the subscription stream // Create a task to forward events from the subscription stream
let forwardingTask = Task { let forwardingTask = Task {
for await item in newEventsStream { for await item in newEventsStream {
try Task.checkCancellation()
continuation.yield(item) continuation.yield(item)
} }
continuation.finish() continuation.finish()
@@ -876,6 +881,7 @@ extension Ndb {
case cannotConvertFilter(any Error) case cannotConvertFilter(any Error)
case initialQueryFailed case initialQueryFailed
case timeout case timeout
case cancelled
} }
/// An error that may happen when looking something up /// An error that may happen when looking something up