Fix timeline staleness

Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-10-01 14:23:57 -07:00
parent 32e8c1b6e1
commit 84c4594d30
5 changed files with 37 additions and 21 deletions

View File

@@ -475,7 +475,7 @@ struct ContentView: View {
} }
} }
.onReceive(handle_notify(.disconnect_relays)) { () in .onReceive(handle_notify(.disconnect_relays)) { () in
damus_state.nostrNetwork.disconnect() damus_state.nostrNetwork.disconnectRelays()
} }
.onReceive(NotificationCenter.default.publisher(for: UIApplication.willEnterForegroundNotification)) { obj in .onReceive(NotificationCenter.default.publisher(for: UIApplication.willEnterForegroundNotification)) { obj in
print("txn: 📙 DAMUS ACTIVE NOTIFY") print("txn: 📙 DAMUS ACTIVE NOTIFY")
@@ -523,7 +523,7 @@ struct ContentView: View {
damusClosingTask = Task { @MainActor in damusClosingTask = Task { @MainActor in
Log.debug("App background signal handling: App being backgrounded", for: .app_lifecycle) Log.debug("App background signal handling: App being backgrounded", for: .app_lifecycle)
let startTime = CFAbsoluteTimeGetCurrent() let startTime = CFAbsoluteTimeGetCurrent()
await damus_state.nostrNetwork.close() // Close ndb streaming tasks before closing ndb to avoid memory errors await damus_state.nostrNetwork.handleAppBackgroundRequest() // Close ndb streaming tasks before closing ndb to avoid memory errors
Log.debug("App background signal handling: Nostr network closed after %.2f seconds", for: .app_lifecycle, CFAbsoluteTimeGetCurrent() - startTime) Log.debug("App background signal handling: Nostr network closed after %.2f seconds", for: .app_lifecycle, CFAbsoluteTimeGetCurrent() - startTime)
damus_state.ndb.close() damus_state.ndb.close()
Log.debug("App background signal handling: Ndb closed after %.2f seconds", for: .app_lifecycle, CFAbsoluteTimeGetCurrent() - startTime) Log.debug("App background signal handling: Ndb closed after %.2f seconds", for: .app_lifecycle, CFAbsoluteTimeGetCurrent() - startTime)
@@ -537,7 +537,8 @@ struct ContentView: View {
print("txn: 📙 DAMUS ACTIVE") print("txn: 📙 DAMUS ACTIVE")
Task { Task {
await damusClosingTask?.value // Wait for the closing task to finish before reopening things, to avoid race conditions await damusClosingTask?.value // Wait for the closing task to finish before reopening things, to avoid race conditions
damus_state.nostrNetwork.connect() damusClosingTask = nil
// Pinging the network will automatically reconnect any dead websocket connections
damus_state.nostrNetwork.ping() damus_state.nostrNetwork.ping()
} }
@unknown default: @unknown default:

View File

@@ -51,15 +51,18 @@ class NostrNetworkManager {
/// Connects the app to the Nostr network /// Connects the app to the Nostr network
func connect() { func connect() {
self.userRelayList.connect() self.userRelayList.connect() // Will load the user's list, apply it, and get RelayPool to connect to it.
self.pool.open = true
Task { await self.profilesManager.load() } Task { await self.profilesManager.load() }
} }
func disconnect() { func disconnectRelays() {
self.pool.disconnect() self.pool.disconnect()
} }
func handleAppBackgroundRequest() async {
await self.reader.cancelAllTasks()
}
func close() async { func close() async {
await withTaskGroup { group in await withTaskGroup { group in
// Spawn each cancellation task in parallel for faster execution speed // Spawn each cancellation task in parallel for faster execution speed
@@ -69,9 +72,9 @@ class NostrNetworkManager {
group.addTask { group.addTask {
await self.profilesManager.stop() await self.profilesManager.stop()
} }
pool.close()
// But await on each one to prevent race conditions // But await on each one to prevent race conditions
for await value in group { continue } for await value in group { continue }
pool.close()
} }
} }

View File

@@ -254,13 +254,13 @@ extension NostrNetworkManager {
} }
Task { Task {
// Add the ndb streaming task to the task manager so that it can be cancelled when the app is backgrounded
let ndbStreamTaskId = await self.taskManager.add(task: ndbStreamTask) let ndbStreamTaskId = await self.taskManager.add(task: ndbStreamTask)
let streamTaskId = await self.taskManager.add(task: streamTask)
continuation.onTermination = { @Sendable _ in continuation.onTermination = { @Sendable _ in
Task { Task {
await self.taskManager.cancelAndCleanUp(taskId: ndbStreamTaskId) await self.taskManager.cancelAndCleanUp(taskId: ndbStreamTaskId)
await self.taskManager.cancelAndCleanUp(taskId: streamTaskId) streamTask.cancel()
} }
} }
} }

View File

@@ -236,9 +236,14 @@ extension NostrNetworkManager {
) )
changed = true changed = true
} }
// Always tell RelayPool to connect whether or not we are already connected.
// This is because:
// 1. Internally it won't redo the connection because of internal checks
// 2. Even if the relay list has not changed, relays may have been disconnected from app lifecycle or other events
pool.connect()
if changed { if changed {
pool.connect()
notify(.relays_changed) notify(.relays_changed)
} }
} }

View File

@@ -10,7 +10,7 @@ import Network
struct RelayHandler { struct RelayHandler {
let sub_id: String let sub_id: String
let callback: (RelayURL, NostrConnectionEvent) -> () var callback: (RelayURL, NostrConnectionEvent) -> ()
} }
struct QueuedRequest { struct QueuedRequest {
@@ -95,7 +95,7 @@ class RelayPool {
func remove_handler(sub_id: String) { func remove_handler(sub_id: String) {
self.handlers = handlers.filter { $0.sub_id != sub_id } self.handlers = handlers.filter { $0.sub_id != sub_id }
print("removing \(sub_id) handler, current: \(handlers.count)") Log.debug("Removing %s handler, current: %d", for: .networking, sub_id, handlers.count)
} }
func ping() { func ping() {
@@ -112,16 +112,17 @@ class RelayPool {
try? await Task.sleep(for: .seconds(1)) try? await Task.sleep(for: .seconds(1))
} }
Log.debug("%s: Subscription pool cleared", for: .networking, sub_id) Log.debug("%s: Subscription pool cleared", for: .networking, sub_id)
for handler in handlers { handlers = handlers.filter({ handler in
// don't add duplicate handlers
if handler.sub_id == sub_id { if handler.sub_id == sub_id {
assertionFailure("Duplicate handlers are not allowed. Proper error handling for this has not been built yet.") Log.error("Duplicate handler detected for the same subscription ID. Overriding.", for: .networking)
Log.error("Duplicate handlers are not allowed. Error handling for this has not been built yet.", for: .networking) return false
return
} }
} else {
return true
}
})
self.handlers.append(RelayHandler(sub_id: sub_id, callback: handler)) self.handlers.append(RelayHandler(sub_id: sub_id, callback: handler))
print("registering \(sub_id) handler, current: \(self.handlers.count)") Log.debug("Registering %s handler, current: %d", for: .networking, sub_id, self.handlers.count)
} }
func remove_relay(_ relay_id: RelayURL) { func remove_relay(_ relay_id: RelayURL) {
@@ -194,14 +195,17 @@ class RelayPool {
} }
func connect(to: [RelayURL]? = nil) { func connect(to: [RelayURL]? = nil) {
open = true
let relays = to.map{ get_relays($0) } ?? self.relays let relays = to.map{ get_relays($0) } ?? self.relays
for relay in relays { for relay in relays {
relay.connection.connect() relay.connection.connect()
} }
// Mark as open last, to prevent other classes from pulling data before the relays are actually connected
open = true
} }
func disconnect(to: [RelayURL]? = nil) { func disconnect(to: [RelayURL]? = nil) {
// Mark as closed first, to prevent other classes from pulling data while the relays are being disconnected
open = false
let relays = to.map{ get_relays($0) } ?? self.relays let relays = to.map{ get_relays($0) } ?? self.relays
for relay in relays { for relay in relays {
relay.connection.disconnect() relay.connection.disconnect()
@@ -218,9 +222,11 @@ class RelayPool {
func subscribe(sub_id: String, filters: [NostrFilter], handler: @escaping (RelayURL, NostrConnectionEvent) -> (), to: [RelayURL]? = nil) { func subscribe(sub_id: String, filters: [NostrFilter], handler: @escaping (RelayURL, NostrConnectionEvent) -> (), to: [RelayURL]? = nil) {
Task { Task {
await register_handler(sub_id: sub_id, handler: handler) await register_handler(sub_id: sub_id, handler: handler)
// When the caller specifies no relays, it is implied that the user wants to use the ones in the user relay list. Skip ephemeral relays in that case. // When the caller specifies no relays, it is implied that the user wants to use the ones in the user relay list. Skip ephemeral relays in that case.
// When the caller specifies specific relays, do not skip ephemeral relays to respect the exact list given by the caller. // When the caller specifies specific relays, do not skip ephemeral relays to respect the exact list given by the caller.
let shouldSkipEphemeralRelays = to == nil ? true : false let shouldSkipEphemeralRelays = to == nil ? true : false
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to, skip_ephemeral: shouldSkipEphemeralRelays) send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to, skip_ephemeral: shouldSkipEphemeralRelays)
} }
} }
@@ -299,6 +305,7 @@ class RelayPool {
func subscribe_to(sub_id: String, filters: [NostrFilter], to: [RelayURL]?, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) { func subscribe_to(sub_id: String, filters: [NostrFilter], to: [RelayURL]?, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) {
Task { Task {
await register_handler(sub_id: sub_id, handler: handler) await register_handler(sub_id: sub_id, handler: handler)
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to) send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to)
} }
} }
@@ -313,7 +320,7 @@ class RelayPool {
return c return c
} }
@MainActor @MainActor
func queue_req(r: NostrRequestType, relay: RelayURL, skip_ephemeral: Bool) { func queue_req(r: NostrRequestType, relay: RelayURL, skip_ephemeral: Bool) {
let count = count_queued(relay: relay) let count = count_queued(relay: relay)