Multi-session subscriptions and RelayPool reopening

This commit implements nostr network subscriptions that survive between
sessions, as well as improved handling of RelayPool opening/closing with
respect to the app lifecycle.

This prevents stale data after users swap out and back into Damus.

Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-08-27 18:59:20 -07:00
parent 46c3667ec3
commit c4c3656f90
8 changed files with 62 additions and 0 deletions

View File

@@ -521,6 +521,7 @@ struct ContentView: View {
break break
case .active: case .active:
print("txn: 📙 DAMUS ACTIVE") print("txn: 📙 DAMUS ACTIVE")
damus_state.nostrNetwork.connect()
damus_state.nostrNetwork.ping() damus_state.nostrNetwork.ping()
@unknown default: @unknown default:
break break

View File

@@ -42,6 +42,10 @@ extension NIP65 {
self.relays = Self.relayOrderedDictionary(from: relays) self.relays = Self.relayOrderedDictionary(from: relays)
} }
init() {
self.relays = Self.relayOrderedDictionary(from: [])
}
init(relays: [RelayURL]) { init(relays: [RelayURL]) {
let relayItemList = relays.map({ RelayItem(url: $0, rwConfiguration: .readWrite) }) let relayItemList = relays.map({ RelayItem(url: $0, rwConfiguration: .readWrite) })
self.relays = Self.relayOrderedDictionary(from: relayItemList) self.relays = Self.relayOrderedDictionary(from: relayItemList)

View File

@@ -50,6 +50,7 @@ class NostrNetworkManager {
/// Connects the app to the Nostr network /// Connects the app to the Nostr network
func connect() { func connect() {
self.userRelayList.connect() self.userRelayList.connect()
self.pool.open = true
} }
func disconnect() { func disconnect() {

View File

@@ -34,6 +34,54 @@ extension NostrNetworkManager {
/// - Parameter filters: The nostr filters to specify what kind of data to subscribe to /// - Parameter filters: The nostr filters to specify what kind of data to subscribe to
/// - Returns: An async stream of nostr data /// - Returns: An async stream of nostr data
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream<StreamItem> { func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream<StreamItem> {
return AsyncStream<StreamItem> { continuation in
let subscriptionId = UUID()
Log.info("Starting subscription %s: %s", for: .subscription_manager, subscriptionId.uuidString, filters.debugDescription)
let multiSessionStreamingTask = Task {
while !Task.isCancelled {
do {
guard !self.ndb.is_closed else {
Log.info("%s: Ndb closed. Sleeping for 1 second before resuming.", for: .subscription_manager, subscriptionId.uuidString)
try await Task.sleep(nanoseconds: 1_000_000_000)
continue
}
guard self.pool.open else {
Log.info("%s: RelayPool closed. Sleeping for 1 second before resuming.", for: .subscription_manager, subscriptionId.uuidString)
try await Task.sleep(nanoseconds: 1_000_000_000)
continue
}
Log.info("%s: Streaming.", for: .subscription_manager, subscriptionId.uuidString)
for await item in self.sessionSubscribe(filters: filters, to: desiredRelays) {
try Task.checkCancellation()
continuation.yield(item)
}
Log.info("%s: Session subscription ended. Sleeping for 1 second before resuming.", for: .subscription_manager, subscriptionId.uuidString)
try await Task.sleep(nanoseconds: 1_000_000_000)
}
catch {
Log.error("%s: Error: %s", for: .subscription_manager, subscriptionId.uuidString, error.localizedDescription)
}
}
Log.info("%s: Terminated.", for: .subscription_manager, subscriptionId.uuidString)
}
continuation.onTermination = { @Sendable _ in
Log.info("%s: Cancelled.", for: .subscription_manager, subscriptionId.uuidString)
multiSessionStreamingTask.cancel()
}
}
}
/// Subscribes to data from the user's relays
///
/// Only survives for a single session. This exits after the app is backgrounded
///
/// ## Implementation notes
///
/// - When we migrate to the local relay model, we should modify this function to stream directly from NostrDB
///
/// - Parameter filters: The nostr filters to specify what kind of data to subscribe to
/// - Returns: An async stream of nostr data
private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream<StreamItem> {
return AsyncStream<StreamItem> { continuation in return AsyncStream<StreamItem> { continuation in
let ndbStreamTask = Task { let ndbStreamTask = Task {
do { do {

View File

@@ -27,6 +27,7 @@ struct SeenEvent: Hashable {
/// Establishes and manages connections and subscriptions to a list of relays. /// Establishes and manages connections and subscriptions to a list of relays.
class RelayPool { class RelayPool {
private(set) var relays: [Relay] = [] private(set) var relays: [Relay] = []
var open: Bool = false
var handlers: [RelayHandler] = [] var handlers: [RelayHandler] = []
var request_queue: [QueuedRequest] = [] var request_queue: [QueuedRequest] = []
var seen: [NoteId: Set<RelayURL>] = [:] var seen: [NoteId: Set<RelayURL>] = [:]
@@ -46,6 +47,7 @@ class RelayPool {
func close() { func close() {
disconnect() disconnect()
relays = [] relays = []
open = false
handlers = [] handlers = []
request_queue = [] request_queue = []
seen.removeAll() seen.removeAll()
@@ -181,6 +183,7 @@ class RelayPool {
} }
func connect(to: [RelayURL]? = nil) { func connect(to: [RelayURL]? = nil) {
open = true
let relays = to.map{ get_relays($0) } ?? self.relays let relays = to.map{ get_relays($0) } ?? self.relays
for relay in relays { for relay in relays {
relay.connection.connect() relay.connection.connect()

View File

@@ -15,6 +15,8 @@ class NostrNetworkManagerTests: XCTestCase {
override func setUpWithError() throws { override func setUpWithError() throws {
// Put setup code here. This method is called before the invocation of each test method in the class. // Put setup code here. This method is called before the invocation of each test method in the class.
damusState = generate_test_damus_state(mock_profile_info: nil) damusState = generate_test_damus_state(mock_profile_info: nil)
try! damusState?.nostrNetwork.userRelayList.set(userRelayList: NIP65.RelayList())
damusState?.nostrNetwork.connect()
let notesJSONL = getTestNotesJSONL() let notesJSONL = getTestNotesJSONL()

View File

@@ -15,6 +15,8 @@ final class ThreadModelTests: XCTestCase {
override func setUpWithError() throws { override func setUpWithError() throws {
// Put setup code here. This method is called before the invocation of each test method in the class. // Put setup code here. This method is called before the invocation of each test method in the class.
damusState = generate_test_damus_state(mock_profile_info: nil) damusState = generate_test_damus_state(mock_profile_info: nil)
try! damusState?.nostrNetwork.userRelayList.set(userRelayList: NIP65.RelayList())
damusState?.nostrNetwork.connect()
let notesJSONL = getTestNotesJSONL() let notesJSONL = getTestNotesJSONL()

View File

@@ -712,6 +712,7 @@ class Ndb {
return AsyncStream<StreamItem> { continuation in return AsyncStream<StreamItem> { continuation in
// Stream all results already present in the database // Stream all results already present in the database
for noteId in noteIds { for noteId in noteIds {
if Task.isCancelled { return }
continuation.yield(.event(noteId)) continuation.yield(.event(noteId))
} }