Stream from both NDB and network relays
This commit takes a step back from the full local relay model by treating NostrDB as one of the many relays streamed from, instead of the one exclusive relay that other classes rely on. This was done to reduce regression risk from the local relay model migration, without discarding the migration work already done. The full "local relay model" behavior (exclusive NDB streaming) was hidden behind a feature flag for easy migration later on. Closes: https://github.com/damus-io/damus/issues/3225 Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
@@ -38,7 +38,7 @@ class NostrNetworkManager {
|
|||||||
self.delegate = delegate
|
self.delegate = delegate
|
||||||
let pool = RelayPool(ndb: delegate.ndb, keypair: delegate.keypair)
|
let pool = RelayPool(ndb: delegate.ndb, keypair: delegate.keypair)
|
||||||
self.pool = pool
|
self.pool = pool
|
||||||
let reader = SubscriptionManager(pool: pool, ndb: delegate.ndb)
|
let reader = SubscriptionManager(pool: pool, ndb: delegate.ndb, experimentalLocalRelayModelSupport: self.delegate.experimentalLocalRelayModelSupport)
|
||||||
let userRelayList = UserRelayListManager(delegate: delegate, pool: pool, reader: reader)
|
let userRelayList = UserRelayListManager(delegate: delegate, pool: pool, reader: reader)
|
||||||
self.reader = reader
|
self.reader = reader
|
||||||
self.userRelayList = userRelayList
|
self.userRelayList = userRelayList
|
||||||
@@ -174,6 +174,9 @@ extension NostrNetworkManager {
|
|||||||
/// Whether the app is in developer mode
|
/// Whether the app is in developer mode
|
||||||
var developerMode: Bool { get }
|
var developerMode: Bool { get }
|
||||||
|
|
||||||
|
/// Whether the app has the experimental local relay model flag that streams data only from the local relay (ndb)
|
||||||
|
var experimentalLocalRelayModelSupport: Bool { get }
|
||||||
|
|
||||||
/// The cache of relay model information
|
/// The cache of relay model information
|
||||||
var relayModelCache: RelayModelCache { get }
|
var relayModelCache: RelayModelCache { get }
|
||||||
|
|
||||||
|
|||||||
@@ -16,11 +16,13 @@ extension NostrNetworkManager {
|
|||||||
private let pool: RelayPool
|
private let pool: RelayPool
|
||||||
private var ndb: Ndb
|
private var ndb: Ndb
|
||||||
private var taskManager: TaskManager
|
private var taskManager: TaskManager
|
||||||
|
private let experimentalLocalRelayModelSupport: Bool
|
||||||
|
|
||||||
init(pool: RelayPool, ndb: Ndb) {
|
init(pool: RelayPool, ndb: Ndb, experimentalLocalRelayModelSupport: Bool) {
|
||||||
self.pool = pool
|
self.pool = pool
|
||||||
self.ndb = ndb
|
self.ndb = ndb
|
||||||
self.taskManager = TaskManager()
|
self.taskManager = TaskManager()
|
||||||
|
self.experimentalLocalRelayModelSupport = experimentalLocalRelayModelSupport
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: - Subscribing and Streaming data from Nostr
|
// MARK: - Subscribing and Streaming data from Nostr
|
||||||
@@ -127,13 +129,28 @@ extension NostrNetworkManager {
|
|||||||
/// - Returns: An async stream of nostr data
|
/// - Returns: An async stream of nostr data
|
||||||
private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream<StreamItem> {
|
private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream<StreamItem> {
|
||||||
return AsyncStream<StreamItem> { continuation in
|
return AsyncStream<StreamItem> { continuation in
|
||||||
|
var ndbEOSEIssued = false
|
||||||
|
var networkEOSEIssued = false
|
||||||
|
|
||||||
|
// This closure function issues (yields) an EOSE signal to the stream if all relevant conditions are met
|
||||||
|
let yieldEOSEIfReady = {
|
||||||
|
// In normal mode: Issuing EOSE requires EOSE from both NDB and the network, since they are all considered separate relays
|
||||||
|
// In experimental local relay model mode: Issuing EOSE requires only EOSE from NDB, since that is the only relay that "matters"
|
||||||
|
let canIssueEOSE = self.experimentalLocalRelayModelSupport ? ndbEOSEIssued : ndbEOSEIssued && networkEOSEIssued
|
||||||
|
if canIssueEOSE {
|
||||||
|
continuation.yield(.eose)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let ndbStreamTask = Task {
|
let ndbStreamTask = Task {
|
||||||
do {
|
do {
|
||||||
for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) {
|
for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) {
|
||||||
try Task.checkCancellation()
|
try Task.checkCancellation()
|
||||||
switch item {
|
switch item {
|
||||||
case .eose:
|
case .eose:
|
||||||
continuation.yield(.eose)
|
Log.debug("Session subscribe: Received EOSE from nostrdb", for: .subscription_manager)
|
||||||
|
ndbEOSEIssued = true
|
||||||
|
yieldEOSEIfReady()
|
||||||
case .event(let noteKey):
|
case .event(let noteKey):
|
||||||
let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey)
|
let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey)
|
||||||
try Task.checkCancellation()
|
try Task.checkCancellation()
|
||||||
@@ -161,8 +178,14 @@ extension NostrNetworkManager {
|
|||||||
switch item {
|
switch item {
|
||||||
case .event(let event):
|
case .event(let event):
|
||||||
Log.debug("Session subscribe: Received kind %d event with id %s from the network", for: .subscription_manager, event.kind, event.id.hex())
|
Log.debug("Session subscribe: Received kind %d event with id %s from the network", for: .subscription_manager, event.kind, event.id.hex())
|
||||||
|
if !self.experimentalLocalRelayModelSupport {
|
||||||
|
// In normal mode (non-experimental), we stream from ndb but also directly from the network
|
||||||
|
continuation.yield(.event(lender: NdbNoteLender(ownedNdbNote: event)))
|
||||||
|
}
|
||||||
case .eose:
|
case .eose:
|
||||||
Log.debug("Session subscribe: Received EOSE from the network", for: .subscription_manager)
|
Log.debug("Session subscribe: Received EOSE from the network", for: .subscription_manager)
|
||||||
|
networkEOSEIssued = true
|
||||||
|
yieldEOSEIfReady()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -141,7 +141,7 @@ class RelayPool {
|
|||||||
case .string(let str) = msg
|
case .string(let str) = msg
|
||||||
else { return }
|
else { return }
|
||||||
|
|
||||||
let _ = self.ndb.process_event(str, originRelayURL: relay_id)
|
let _ = self.ndb.processEvent(str, originRelayURL: relay_id)
|
||||||
self.message_received_function?((str, desc))
|
self.message_received_function?((str, desc))
|
||||||
})
|
})
|
||||||
let relay = Relay(descriptor: desc, connection: conn)
|
let relay = Relay(descriptor: desc, connection: conn)
|
||||||
|
|||||||
@@ -223,6 +223,7 @@ fileprivate extension DamusState {
|
|||||||
var latestContactListEvent: NostrEvent? { self.contacts.event }
|
var latestContactListEvent: NostrEvent? { self.contacts.event }
|
||||||
var bootstrapRelays: [RelayURL] { get_default_bootstrap_relays() }
|
var bootstrapRelays: [RelayURL] { get_default_bootstrap_relays() }
|
||||||
var developerMode: Bool { self.settings.developer_mode }
|
var developerMode: Bool { self.settings.developer_mode }
|
||||||
|
var experimentalLocalRelayModelSupport: Bool { self.settings.enable_experimental_local_relay_model }
|
||||||
var relayModelCache: RelayModelCache
|
var relayModelCache: RelayModelCache
|
||||||
var relayFilters: RelayFilters
|
var relayFilters: RelayFilters
|
||||||
|
|
||||||
|
|||||||
@@ -243,6 +243,10 @@ class UserSettingsStore: ObservableObject {
|
|||||||
@Setting(key: "enable_experimental_purple_api", default_value: false)
|
@Setting(key: "enable_experimental_purple_api", default_value: false)
|
||||||
var enable_experimental_purple_api: Bool
|
var enable_experimental_purple_api: Bool
|
||||||
|
|
||||||
|
/// Whether the app has the experimental local relay model flag that streams data only from the local relay (ndb)
|
||||||
|
@Setting(key: "enable_experimental_local_relay_model", default_value: false)
|
||||||
|
var enable_experimental_local_relay_model: Bool
|
||||||
|
|
||||||
@StringSetting(key: "purple_environment", default_value: .production)
|
@StringSetting(key: "purple_environment", default_value: .production)
|
||||||
var purple_enviroment: DamusPurpleEnvironment
|
var purple_enviroment: DamusPurpleEnvironment
|
||||||
|
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ class NostrNetworkManagerTests: XCTestCase {
|
|||||||
let notesJSONL = getTestNotesJSONL()
|
let notesJSONL = getTestNotesJSONL()
|
||||||
|
|
||||||
for noteText in notesJSONL.split(separator: "\n") {
|
for noteText in notesJSONL.split(separator: "\n") {
|
||||||
let _ = damusState!.ndb.process_event("[\"EVENT\",\"subid\",\(String(noteText))]")
|
let _ = damusState!.ndb.processEvent("[\"EVENT\",\"subid\",\(String(noteText))]")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ extension Ndb {
|
|||||||
return try self.was(noteKey: noteKey, seenOnAnyOf: relayUrls.map({ $0.absoluteString }), txn: txn)
|
return try self.was(noteKey: noteKey, seenOnAnyOf: relayUrls.map({ $0.absoluteString }), txn: txn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func process_event(_ str: String, originRelayURL: RelayURL? = nil) -> Bool {
|
func processEvent(_ str: String, originRelayURL: RelayURL? = nil) -> Bool {
|
||||||
self.process_event(str, originRelayURL: originRelayURL?.absoluteString)
|
self.process_event(str, originRelayURL: originRelayURL?.absoluteString)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user