Further improvements to app lifecycle handling

- Resend subscription requests to relays when websocket connection is
  re-established
- More safeguard checks on whether Ndb is opened before accessing its
  memory
- Cancel queued unsubscribe requests on app backgrounding to avoid race
  conditions with subscribe requests when app enters the foreground
- Call Ndb re-open when Damus is active (not only on active notify), as
  experimentally there have been instances where active notify code has
  not been run. The operation is idempotent, so there should be no risk
  of it being called twice.

Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-10-05 13:18:59 -07:00
parent 667a228e1a
commit 3437cf5347
7 changed files with 54 additions and 9 deletions

View File

@@ -538,6 +538,7 @@ struct ContentView: View {
Task {
await damusClosingTask?.value // Wait for the closing task to finish before reopening things, to avoid race conditions
damusClosingTask = nil
damus_state.ndb.reopen()
// Pinging the network will automatically reconnect any dead websocket connections
damus_state.nostrNetwork.ping()
}

View File

@@ -61,6 +61,7 @@ class NostrNetworkManager {
func handleAppBackgroundRequest() async {
await self.reader.cancelAllTasks()
self.pool.cleanQueuedRequestForSessionEnd()
}
func close() async {

View File

@@ -220,6 +220,7 @@ extension NostrNetworkManager {
catch {
Self.logger.error("Session subscription \(id.uuidString, privacy: .public): NDB streaming error: \(error.localizedDescription, privacy: .public)")
}
Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): NDB streaming ended")
continuation.finish()
}
let streamTask = Task {
@@ -250,17 +251,19 @@ extension NostrNetworkManager {
catch {
Self.logger.error("Session subscription \(id.uuidString, privacy: .public): Network streaming error: \(error.localizedDescription, privacy: .public)")
}
Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Network streaming ended")
continuation.finish()
}
Task {
// Add the ndb streaming task to the task manager so that it can be cancelled when the app is backgrounded
let ndbStreamTaskId = await self.taskManager.add(task: ndbStreamTask)
let streamTaskId = await self.taskManager.add(task: streamTask)
continuation.onTermination = { @Sendable _ in
Task {
await self.taskManager.cancelAndCleanUp(taskId: ndbStreamTaskId)
streamTask.cancel()
await self.taskManager.cancelAndCleanUp(taskId: streamTaskId)
}
}
}

View File

@@ -10,6 +10,8 @@ import Network
struct RelayHandler {
let sub_id: String
let filters: [NostrFilter]?
let to: [RelayURL]?
var callback: (RelayURL, NostrConnectionEvent) -> ()
}
@@ -106,7 +108,7 @@ class RelayPool {
}
@MainActor
func register_handler(sub_id: String, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) async {
func register_handler(sub_id: String, filters: [NostrFilter]?, to relays: [RelayURL]? = nil, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) async {
while handlers.count > Self.MAX_CONCURRENT_SUBSCRIPTION_LIMIT {
Log.debug("%s: Too many subscriptions, waiting for subscription pool to clear", for: .networking, sub_id)
try? await Task.sleep(for: .seconds(1))
@@ -121,7 +123,7 @@ class RelayPool {
return true
}
})
self.handlers.append(RelayHandler(sub_id: sub_id, callback: handler))
self.handlers.append(RelayHandler(sub_id: sub_id, filters: filters, to: relays, callback: handler))
Log.debug("Registering %s handler, current: %d", for: .networking, sub_id, self.handlers.count)
}
@@ -211,6 +213,23 @@ class RelayPool {
relay.connection.disconnect()
}
}
/// Deletes queued up requests that should not persist between app sessions (i.e. when the app goes to background then back to foreground)
func cleanQueuedRequestForSessionEnd() {
request_queue = request_queue.filter { request in
guard case .typical(let typicalRequest) = request.req else { return true }
switch typicalRequest {
case .subscribe(_):
return true
case .unsubscribe(_):
return false // Do not persist unsubscribe requests to prevent them to race against subscribe requests when we come back to the foreground.
case .event(_):
return true
case .auth(_):
return true
}
}
}
func unsubscribe(sub_id: String, to: [RelayURL]? = nil) {
if to == nil {
@@ -221,7 +240,7 @@ class RelayPool {
func subscribe(sub_id: String, filters: [NostrFilter], handler: @escaping (RelayURL, NostrConnectionEvent) -> (), to: [RelayURL]? = nil) {
Task {
await register_handler(sub_id: sub_id, handler: handler)
await register_handler(sub_id: sub_id, filters: filters, to: to, handler: handler)
// When the caller specifies no relays, it is implied that the user wants to use the ones in the user relay list. Skip ephemeral relays in that case.
// When the caller specifies specific relays, do not skip ephemeral relays to respect the exact list given by the caller.
@@ -304,7 +323,7 @@ class RelayPool {
func subscribe_to(sub_id: String, filters: [NostrFilter], to: [RelayURL]?, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) {
Task {
await register_handler(sub_id: sub_id, handler: handler)
await register_handler(sub_id: sub_id, filters: filters, to: to, handler: handler)
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to)
}
@@ -411,14 +430,34 @@ class RelayPool {
}
}
}
func resubscribeAll(relayId: RelayURL) {
for handler in self.handlers {
guard let filters = handler.filters else { continue }
// When the caller specifies no relays, it is implied that the user wants to use the ones in the user relay list. Skip ephemeral relays in that case.
// When the caller specifies specific relays, do not skip ephemeral relays to respect the exact list given by the caller.
let shouldSkipEphemeralRelays = handler.to == nil ? true : false
if let handlerTargetRelays = handler.to,
!handlerTargetRelays.contains(where: { $0 == relayId }) {
// Not part of the target relays, skip
continue
}
send(.subscribe(.init(filters: filters, sub_id: handler.sub_id)), to: [relayId], skip_ephemeral: shouldSkipEphemeralRelays)
}
}
func handle_event(relay_id: RelayURL, event: NostrConnectionEvent) {
record_seen(relay_id: relay_id, event: event)
// run req queue when we reconnect
// When we reconnect, do two things
// - Send messages that were stored in the queue
// - Re-subscribe to filters we had subscribed before
if case .ws_connection_event(let ws) = event {
if case .connected = ws {
run_queue(relay_id)
self.resubscribeAll(relayId: relay_id)
}
}

View File

@@ -142,7 +142,7 @@ struct SaveKeysView: View {
add_rw_relay(self.pool, relay)
}
Task { await self.pool.register_handler(sub_id: "signup", handler: handle_event) }
Task { await self.pool.register_handler(sub_id: "signup", filters: nil, handler: handle_event) }
self.loading = true

View File

@@ -60,7 +60,7 @@ class PostBox {
init(pool: RelayPool) {
self.pool = pool
self.events = [:]
Task { await pool.register_handler(sub_id: "postbox", handler: handle_event) }
Task { await pool.register_handler(sub_id: "postbox", filters: nil, to: nil, handler: handle_event) }
}
// only works reliably on delay-sent events

View File

@@ -701,9 +701,10 @@ class Ndb {
terminationStarted = true
Log.debug("ndb_wait: stream: Terminated early", for: .ndb)
streaming = false
ndb_unsubscribe(self.ndb.ndb, subid)
Task { await self.unsetCallback(subscriptionId: subid) }
filtersPointer.deallocate()
guard !self.is_closed else { return } // Double-check Ndb is open before sending unsubscribe
ndb_unsubscribe(self.ndb.ndb, subid)
}
}
}