Modify NostrNetworkManager pipeline architecture

Previously, we combined the ndb and network stream within a "session
subscription" stream, which was teared down and rebuilt every time the
app went into the background and back to the foreground (This was done to
prevent crashes related to access to Ndb memory when Ndb is closed).

However, this caused complications and instability on the network
stream, leading to timeline staleness.

To address this, the pipeline was modified to merge the ndb and network
streams further upstream, on the multi-session stage, allowing the
session subscription streams to be completely split between Ndb and the
network.

For the ndb stream, we still tear it down and bring it up along the app
foreground state, to prevent memory crashes. However, the network stream
is kept intact between sessions, since RelayPool will now automatically
handle resubscription on websocket reconnection. This prevents
complexity and potential race conditions that could lead to timeline
staleness.

Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-10-05 15:21:57 -07:00
parent 3437cf5347
commit d9306d4153
2 changed files with 129 additions and 84 deletions

View File

@@ -112,64 +112,7 @@ extension NostrNetworkManager {
}
}
/// Subscribes to data from the user's relays
func advancedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
return AsyncStream<StreamItem> { continuation in
let subscriptionId = id ?? UUID()
let startTime = CFAbsoluteTimeGetCurrent()
Self.logger.info("Starting subscription \(subscriptionId.uuidString, privacy: .public): \(filters.debugDescription, privacy: .private)")
let multiSessionStreamingTask = Task {
while !Task.isCancelled {
do {
guard !self.ndb.is_closed else {
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Ndb closed. Sleeping for 1 second before resuming.")
try await Task.sleep(nanoseconds: 1_000_000_000)
continue
}
guard self.pool.open else {
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): RelayPool closed. Sleeping for 1 second before resuming.")
try await Task.sleep(nanoseconds: 1_000_000_000)
continue
}
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Streaming.")
for await item in self.sessionSubscribe(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) {
try Task.checkCancellation()
continuation.yield(item)
}
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Session subscription ended. Sleeping for 1 second before resuming.")
try await Task.sleep(nanoseconds: 1_000_000_000)
}
catch {
Self.logger.error("Session subscription \(subscriptionId.uuidString, privacy: .public): Error: \(error.localizedDescription, privacy: .public)")
}
}
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Terminated.")
}
let timeoutTask = Task {
if let timeout {
try await Task.sleep(for: timeout)
continuation.finish() // End the stream due to timeout.
}
}
continuation.onTermination = { @Sendable _ in
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Cancelled.")
multiSessionStreamingTask.cancel()
timeoutTask.cancel()
}
}
}
/// Subscribes to data from the user's relays
///
/// Only survives for a single session. This exits after the app is backgrounded
///
/// ## Implementation notes
///
/// - When we migrate to the local relay model, we should modify this function to stream directly from NostrDB
///
/// - Parameter filters: The nostr filters to specify what kind of data to subscribe to
/// - Returns: An async stream of nostr data
private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
let id = id ?? UUID()
let streamMode = streamMode ?? defaultStreamMode()
return AsyncStream<StreamItem> { continuation in
@@ -194,36 +137,66 @@ extension NostrNetworkManager {
}
}
let ndbStreamTask = Task {
do {
for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) {
let streamTask = Task {
while !Task.isCancelled {
for await item in self.multiSessionNetworkStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) {
try Task.checkCancellation()
switch item {
case .event(let lender):
continuation.yield(item)
case .eose:
Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from nostrdb. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds")
continuation.yield(.ndbEose)
break // Should not happen
case .ndbEose:
break // Should not happen
case .networkEose:
continuation.yield(item)
networkEOSEIssued = true
yieldEOSEIfReady()
}
}
}
}
let ndbStreamTask = Task {
while !Task.isCancelled {
for await item in self.multiSessionNdbStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) {
try Task.checkCancellation()
switch item {
case .event(let lender):
continuation.yield(item)
case .eose:
break // Should not happen
case .ndbEose:
continuation.yield(item)
ndbEOSEIssued = true
yieldEOSEIfReady()
case .event(let noteKey):
let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey)
try Task.checkCancellation()
guard let desiredRelays else {
continuation.yield(.event(lender: lender)) // If no desired relays are specified, return all notes we see.
break
}
if try ndb.was(noteKey: noteKey, seenOnAnyOf: desiredRelays) {
continuation.yield(.event(lender: lender)) // If desired relays were specified and this note was seen there, return it.
case .networkEose:
break // Should not happen
}
}
}
}
catch {
Self.logger.error("Session subscription \(id.uuidString, privacy: .public): NDB streaming error: \(error.localizedDescription, privacy: .public)")
continuation.onTermination = { @Sendable _ in
streamTask.cancel()
ndbStreamTask.cancel()
}
Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): NDB streaming ended")
continuation.finish()
}
}
private func multiSessionNetworkStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
let id = id ?? UUID()
let streamMode = streamMode ?? defaultStreamMode()
return AsyncStream<StreamItem> { continuation in
let startTime = CFAbsoluteTimeGetCurrent()
Self.logger.debug("Network subscription \(id.uuidString, privacy: .public): Started")
let streamTask = Task {
while !self.pool.open {
Self.logger.info("\(id.uuidString, privacy: .public): RelayPool closed. Sleeping for 1 second before resuming.")
try await Task.sleep(nanoseconds: 1_000_000_000)
continue
}
do {
for await item in self.pool.subscribe(filters: filters, to: desiredRelays, id: id) {
// NO-OP. Notes will be automatically ingested by NostrDB
@@ -243,27 +216,98 @@ extension NostrNetworkManager {
case .eose:
Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from the network. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds")
continuation.yield(.networkEose)
networkEOSEIssued = true
yieldEOSEIfReady()
}
}
}
catch {
Self.logger.error("Session subscription \(id.uuidString, privacy: .public): Network streaming error: \(error.localizedDescription, privacy: .public)")
Self.logger.error("Network subscription \(id.uuidString, privacy: .public): Streaming error: \(error.localizedDescription, privacy: .public)")
}
Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Network streaming ended")
Self.logger.debug("Network subscription \(id.uuidString, privacy: .public): Network streaming ended")
continuation.finish()
}
continuation.onTermination = { @Sendable _ in
streamTask.cancel()
}
}
}
private func multiSessionNdbStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
return AsyncStream<StreamItem> { continuation in
let subscriptionId = id ?? UUID()
let startTime = CFAbsoluteTimeGetCurrent()
Self.logger.info("Starting multi-session NDB subscription \(subscriptionId.uuidString, privacy: .public): \(filters.debugDescription, privacy: .private)")
let multiSessionStreamingTask = Task {
while !Task.isCancelled {
do {
guard !self.ndb.is_closed else {
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Ndb closed. Sleeping for 1 second before resuming.")
try await Task.sleep(nanoseconds: 1_000_000_000)
continue
}
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Streaming from NDB.")
for await item in self.sessionNdbStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) {
try Task.checkCancellation()
continuation.yield(item)
}
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Session subscription ended. Sleeping for 1 second before resuming.")
try await Task.sleep(nanoseconds: 1_000_000_000)
}
catch {
Self.logger.error("Session subscription \(subscriptionId.uuidString, privacy: .public): Error: \(error.localizedDescription, privacy: .public)")
}
}
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Terminated.")
}
continuation.onTermination = { @Sendable _ in
Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Cancelled multi-session NDB stream.")
multiSessionStreamingTask.cancel()
}
}
}
private func sessionNdbStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
let id = id ?? UUID()
//let streamMode = streamMode ?? defaultStreamMode()
return AsyncStream<StreamItem> { continuation in
let startTime = CFAbsoluteTimeGetCurrent()
Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Started")
let ndbStreamTask = Task {
do {
for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) {
try Task.checkCancellation()
switch item {
case .eose:
Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from nostrdb. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds")
continuation.yield(.ndbEose)
case .event(let noteKey):
let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey)
try Task.checkCancellation()
guard let desiredRelays else {
continuation.yield(.event(lender: lender)) // If no desired relays are specified, return all notes we see.
break
}
if try ndb.was(noteKey: noteKey, seenOnAnyOf: desiredRelays) {
continuation.yield(.event(lender: lender)) // If desired relays were specified and this note was seen there, return it.
}
}
}
}
catch {
Self.logger.error("Session subscription \(id.uuidString, privacy: .public): NDB streaming error: \(error.localizedDescription, privacy: .public)")
}
Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): NDB streaming ended")
continuation.finish()
}
Task {
// Add the ndb streaming task to the task manager so that it can be cancelled when the app is backgrounded
let ndbStreamTaskId = await self.taskManager.add(task: ndbStreamTask)
let streamTaskId = await self.taskManager.add(task: streamTask)
continuation.onTermination = { @Sendable _ in
Task {
await self.taskManager.cancelAndCleanUp(taskId: ndbStreamTaskId)
await self.taskManager.cancelAndCleanUp(taskId: streamTaskId)
}
}
}

View File

@@ -444,6 +444,7 @@ class RelayPool {
continue
}
Log.debug("%s: Sending resubscribe request to %s", for: .networking, handler.sub_id, relayId.absoluteString)
send(.subscribe(.init(filters: filters, sub_id: handler.sub_id)), to: [relayId], skip_ephemeral: shouldSkipEphemeralRelays)
}
}