From 2185984ed7750f83f5a84c06a68df2baba39d2b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20D=E2=80=99Aquino?= Date: Mon, 15 Sep 2025 11:20:20 -0700 Subject: [PATCH] Stream from both NDB and network relays MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit takes a step back from the full local relay model by treating NostrDB as one of the many relays streamed from, instead of the one exclusive relay that other classes rely on. This was done to reduce regression risk from the local relay model migration, without discarding the migration work already done. The full "local relay model" behavior (exclusive NDB streaming) was hidden behind a feature flag for easy migration later on. Closes: https://github.com/damus-io/damus/issues/3225 Signed-off-by: Daniel D’Aquino --- .../NostrNetworkManager.swift | 5 +++- .../SubscriptionManager.swift | 27 +++++++++++++++++-- damus/Core/Nostr/RelayPool.swift | 2 +- damus/Core/Storage/DamusState.swift | 1 + .../Settings/Models/UserSettingsStore.swift | 4 +++ .../NostrNetworkManagerTests.swift | 2 +- nostrdb/Ndb+.swift | 2 +- 7 files changed, 37 insertions(+), 6 deletions(-) diff --git a/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift b/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift index ac11b97a..80321f8b 100644 --- a/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift @@ -38,7 +38,7 @@ class NostrNetworkManager { self.delegate = delegate let pool = RelayPool(ndb: delegate.ndb, keypair: delegate.keypair) self.pool = pool - let reader = SubscriptionManager(pool: pool, ndb: delegate.ndb) + let reader = SubscriptionManager(pool: pool, ndb: delegate.ndb, experimentalLocalRelayModelSupport: self.delegate.experimentalLocalRelayModelSupport) let userRelayList = UserRelayListManager(delegate: delegate, pool: pool, reader: reader) self.reader = reader self.userRelayList = userRelayList @@ -174,6 +174,9 @@ extension NostrNetworkManager { /// Whether the app is in developer mode var developerMode: Bool { get } + /// Whether the app has the experimental local relay model flag that streams data only from the local relay (ndb) + var experimentalLocalRelayModelSupport: Bool { get } + /// The cache of relay model information var relayModelCache: RelayModelCache { get } diff --git a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift index aa3b7de2..ca79def5 100644 --- a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift @@ -16,11 +16,13 @@ extension NostrNetworkManager { private let pool: RelayPool private var ndb: Ndb private var taskManager: TaskManager + private let experimentalLocalRelayModelSupport: Bool - init(pool: RelayPool, ndb: Ndb) { + init(pool: RelayPool, ndb: Ndb, experimentalLocalRelayModelSupport: Bool) { self.pool = pool self.ndb = ndb self.taskManager = TaskManager() + self.experimentalLocalRelayModelSupport = experimentalLocalRelayModelSupport } // MARK: - Subscribing and Streaming data from Nostr @@ -127,13 +129,28 @@ extension NostrNetworkManager { /// - Returns: An async stream of nostr data private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream { return AsyncStream { continuation in + var ndbEOSEIssued = false + var networkEOSEIssued = false + + // This closure function issues (yields) an EOSE signal to the stream if all relevant conditions are met + let yieldEOSEIfReady = { + // In normal mode: Issuing EOSE requires EOSE from both NDB and the network, since they are all considered separate relays + // In experimental local relay model mode: Issuing EOSE requires only EOSE from NDB, since that is the only relay that "matters" + let canIssueEOSE = self.experimentalLocalRelayModelSupport ? ndbEOSEIssued : ndbEOSEIssued && networkEOSEIssued + if canIssueEOSE { + continuation.yield(.eose) + } + } + let ndbStreamTask = Task { do { for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) { try Task.checkCancellation() switch item { case .eose: - continuation.yield(.eose) + Log.debug("Session subscribe: Received EOSE from nostrdb", for: .subscription_manager) + ndbEOSEIssued = true + yieldEOSEIfReady() case .event(let noteKey): let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey) try Task.checkCancellation() @@ -161,8 +178,14 @@ extension NostrNetworkManager { switch item { case .event(let event): Log.debug("Session subscribe: Received kind %d event with id %s from the network", for: .subscription_manager, event.kind, event.id.hex()) + if !self.experimentalLocalRelayModelSupport { + // In normal mode (non-experimental), we stream from ndb but also directly from the network + continuation.yield(.event(lender: NdbNoteLender(ownedNdbNote: event))) + } case .eose: Log.debug("Session subscribe: Received EOSE from the network", for: .subscription_manager) + networkEOSEIssued = true + yieldEOSEIfReady() } } } diff --git a/damus/Core/Nostr/RelayPool.swift b/damus/Core/Nostr/RelayPool.swift index 23f3042b..6b24a8e6 100644 --- a/damus/Core/Nostr/RelayPool.swift +++ b/damus/Core/Nostr/RelayPool.swift @@ -141,7 +141,7 @@ class RelayPool { case .string(let str) = msg else { return } - let _ = self.ndb.process_event(str, originRelayURL: relay_id) + let _ = self.ndb.processEvent(str, originRelayURL: relay_id) self.message_received_function?((str, desc)) }) let relay = Relay(descriptor: desc, connection: conn) diff --git a/damus/Core/Storage/DamusState.swift b/damus/Core/Storage/DamusState.swift index f663064a..f7170a03 100644 --- a/damus/Core/Storage/DamusState.swift +++ b/damus/Core/Storage/DamusState.swift @@ -223,6 +223,7 @@ fileprivate extension DamusState { var latestContactListEvent: NostrEvent? { self.contacts.event } var bootstrapRelays: [RelayURL] { get_default_bootstrap_relays() } var developerMode: Bool { self.settings.developer_mode } + var experimentalLocalRelayModelSupport: Bool { self.settings.enable_experimental_local_relay_model } var relayModelCache: RelayModelCache var relayFilters: RelayFilters diff --git a/damus/Features/Settings/Models/UserSettingsStore.swift b/damus/Features/Settings/Models/UserSettingsStore.swift index 0da7377c..eb01f7e6 100644 --- a/damus/Features/Settings/Models/UserSettingsStore.swift +++ b/damus/Features/Settings/Models/UserSettingsStore.swift @@ -243,6 +243,10 @@ class UserSettingsStore: ObservableObject { @Setting(key: "enable_experimental_purple_api", default_value: false) var enable_experimental_purple_api: Bool + /// Whether the app has the experimental local relay model flag that streams data only from the local relay (ndb) + @Setting(key: "enable_experimental_local_relay_model", default_value: false) + var enable_experimental_local_relay_model: Bool + @StringSetting(key: "purple_environment", default_value: .production) var purple_enviroment: DamusPurpleEnvironment diff --git a/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift b/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift index 271c6f15..de3eb078 100644 --- a/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift +++ b/damusTests/NostrNetworkManagerTests/NostrNetworkManagerTests.swift @@ -21,7 +21,7 @@ class NostrNetworkManagerTests: XCTestCase { let notesJSONL = getTestNotesJSONL() for noteText in notesJSONL.split(separator: "\n") { - let _ = damusState!.ndb.process_event("[\"EVENT\",\"subid\",\(String(noteText))]") + let _ = damusState!.ndb.processEvent("[\"EVENT\",\"subid\",\(String(noteText))]") } } diff --git a/nostrdb/Ndb+.swift b/nostrdb/Ndb+.swift index c1d138a9..79fc39c1 100644 --- a/nostrdb/Ndb+.swift +++ b/nostrdb/Ndb+.swift @@ -33,7 +33,7 @@ extension Ndb { return try self.was(noteKey: noteKey, seenOnAnyOf: relayUrls.map({ $0.absoluteString }), txn: txn) } - func process_event(_ str: String, originRelayURL: RelayURL? = nil) -> Bool { + func processEvent(_ str: String, originRelayURL: RelayURL? = nil) -> Bool { self.process_event(str, originRelayURL: originRelayURL?.absoluteString) } }