From d8f4dbb2aabc1e8f4e6b74f25c21825c9e930f5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20D=E2=80=99Aquino?= Date: Fri, 16 Jan 2026 17:12:49 -0800 Subject: [PATCH] Integrate Negentropy with Subscription Manager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This makes negentropy optimizations available to the rest of the app via Subscription Manager. Changelog not needed because this should not have user-facing changes Changelog-None Signed-off-by: Daniel D’Aquino --- damus.xcodeproj/project.pbxproj | 6 + .../NostrNetworkManager/ProfilesManager.swift | 2 +- .../SubscriptionManager.swift | 101 +++- damus/Core/Nostr/NostrRequest.swift | 82 +++ damus/Core/Nostr/RelayConnection.swift | 81 --- .../Features/Timeline/Models/HomeModel.swift | 6 +- .../Utilities/NegentropyUtilities.swift | 14 + .../SubscriptionManagerNegentropyTests.swift | 494 ++++++++++++++++++ nostrdb/Ndb+.swift | 18 + 9 files changed, 698 insertions(+), 106 deletions(-) create mode 100644 damusTests/SubscriptionManagerNegentropyTests.swift diff --git a/damus.xcodeproj/project.pbxproj b/damus.xcodeproj/project.pbxproj index 228f8348..abfa77df 100644 --- a/damus.xcodeproj/project.pbxproj +++ b/damus.xcodeproj/project.pbxproj @@ -1707,6 +1707,8 @@ D77DA2C82F19D480000B7093 /* NegentropyUtilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = D77DA2C72F19D452000B7093 /* NegentropyUtilities.swift */; }; D77DA2C92F19D480000B7093 /* NegentropyUtilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = D77DA2C72F19D452000B7093 /* NegentropyUtilities.swift */; }; D77DA2CA2F19D480000B7093 /* NegentropyUtilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = D77DA2C72F19D452000B7093 /* NegentropyUtilities.swift */; }; + D77DA2CE2F1C2596000B7093 /* SubscriptionManagerNegentropyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D77DA2CD2F1C2596000B7093 /* SubscriptionManagerNegentropyTests.swift */; }; + D77DA2D42F1EEB47000B7093 /* NostrRequest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4CEE2AEC2805B22500AB5EEF /* NostrRequest.swift */; }; D783A63F2AD4E53D00658DDA /* SuggestedHashtagsView.swift in Sources */ = {isa = PBXBuildFile; fileRef = D783A63E2AD4E53D00658DDA /* SuggestedHashtagsView.swift */; }; D78525252A7B2EA4002FA637 /* NoteContentViewTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D78525242A7B2EA4002FA637 /* NoteContentViewTests.swift */; }; D7870BC12AC4750B0080BA88 /* MentionView.swift in Sources */ = {isa = PBXBuildFile; fileRef = D7870BC02AC4750B0080BA88 /* MentionView.swift */; }; @@ -2821,6 +2823,7 @@ D77BFA0A2AE3051200621634 /* ProfileActionSheetView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ProfileActionSheetView.swift; sourceTree = ""; }; D77DA2C32F19CA40000B7093 /* AsyncStreamUtilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AsyncStreamUtilities.swift; sourceTree = ""; }; D77DA2C72F19D452000B7093 /* NegentropyUtilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NegentropyUtilities.swift; sourceTree = ""; }; + D77DA2CD2F1C2596000B7093 /* SubscriptionManagerNegentropyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SubscriptionManagerNegentropyTests.swift; sourceTree = ""; }; D783A63E2AD4E53D00658DDA /* SuggestedHashtagsView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SuggestedHashtagsView.swift; sourceTree = ""; }; D78525242A7B2EA4002FA637 /* NoteContentViewTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NoteContentViewTests.swift; sourceTree = ""; }; D7870BC02AC4750B0080BA88 /* MentionView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MentionView.swift; sourceTree = ""; }; @@ -3864,6 +3867,7 @@ 4CE6DEF627F7A08200C66700 /* damusTests */ = { isa = PBXGroup; children = ( + D77DA2CD2F1C2596000B7093 /* SubscriptionManagerNegentropyTests.swift */, D74723ED2F15B0D6002DA12A /* NegentropySupportTests.swift */, D72734292F089EE600F90677 /* NdbMigrationTests.swift */, D72734272F08912F00F90677 /* DatabaseSnapshotManagerTests.swift */, @@ -6335,6 +6339,7 @@ D727342A2F089EEE00F90677 /* NdbMigrationTests.swift in Sources */, D7CBD1D62B8D509800BFD889 /* DamusPurpleImpendingExpirationTests.swift in Sources */, D7EBF8BB2E59022A004EAE29 /* NostrNetworkManagerTests.swift in Sources */, + D77DA2CE2F1C2596000B7093 /* SubscriptionManagerNegentropyTests.swift in Sources */, D7DEEF2F2A8C021E00E0C99F /* NostrEventTests.swift in Sources */, 4C8D00D429E3C5D40036AF10 /* NIP19Tests.swift in Sources */, 3A30410129AB12AA008A0F29 /* EventGroupViewTests.swift in Sources */, @@ -7535,6 +7540,7 @@ D798D21F2B0858D600234419 /* MigratedTypes.swift in Sources */, D7CE1B472B0BE719002EDAD4 /* NativeObject.swift in Sources */, D71AD9002CEC176A002E2C3C /* AppAccessibilityIdentifiers.swift in Sources */, + D77DA2D42F1EEB47000B7093 /* NostrRequest.swift in Sources */, D7CB5D552B11758A00AD4105 /* UnmuteThreadNotify.swift in Sources */, D7CCFC192B058A3F00323D86 /* Block.swift in Sources */, D7CCFC112B05884E00323D86 /* AsciiCharacter.swift in Sources */, diff --git a/damus/Core/Networking/NostrNetworkManager/ProfilesManager.swift b/damus/Core/Networking/NostrNetworkManager/ProfilesManager.swift index 41e73775..e2dbe795 100644 --- a/damus/Core/Networking/NostrNetworkManager/ProfilesManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/ProfilesManager.swift @@ -81,7 +81,7 @@ extension NostrNetworkManager { guard pubkeys.count > 0 else { return } let profileFilter = NostrFilter(kinds: [.metadata], authors: pubkeys) try Task.checkCancellation() - for await ndbLender in self.subscriptionManager.streamIndefinitely(filters: [profileFilter], streamMode: .ndbFirst(optimizeNetworkFilter: false)) { + for await ndbLender in self.subscriptionManager.streamIndefinitely(filters: [profileFilter], streamMode: .ndbFirst(networkOptimization: nil)) { try Task.checkCancellation() try? ndbLender.borrow { ev in publishProfileUpdates(metadataEvent: ev) diff --git a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift index 63f5b804..7b25fd29 100644 --- a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift @@ -6,6 +6,7 @@ // import Foundation import os +import Negentropy extension NostrNetworkManager { @@ -138,20 +139,18 @@ extension NostrNetworkManager { var networkStreamTask: Task? = nil var latestNoteTimestampSeen: UInt32? = nil + var negentropyStorageVector = NegentropyStorageVector() let startNetworkStreamTask = { guard streamMode.shouldStreamFromNetwork else { return } networkStreamTask = Task { while !Task.isCancelled { - let optimizedFilters = filters.map { - var optimizedFilter = $0 - // Shift the since filter 2 minutes (120 seconds) before the last note timestamp - if let latestTimestamp = latestNoteTimestampSeen { - optimizedFilter.since = latestTimestamp > 120 ? latestTimestamp - 120 : 0 - } - return optimizedFilter - } - for await item in self.multiSessionNetworkStream(filters: optimizedFilters, to: desiredRelays, streamMode: streamMode, id: id) { + let networkOptimizationData = StreamMode.NetworkOptimizationData.from( + strategy: streamMode.networkOptimizationStrategy, + latestNoteTimestampSeen: latestNoteTimestampSeen, + negentropyStorageVector: negentropyStorageVector + ) + for await item in self.multiSessionNetworkStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id, networkOptimizationData: networkOptimizationData) { try Task.checkCancellation() logStreamPipelineStats("SubscriptionManager_Network_Stream_\(id)", "SubscriptionManager_Advanced_Stream_\(id)") switch item { @@ -193,6 +192,7 @@ extension NostrNetworkManager { else { latestNoteTimestampSeen = event.createdAt } + negentropyStorageVector.unsealAndInsert(nostrEvent: event) }) continuation.yield(item) case .eose: @@ -219,7 +219,7 @@ extension NostrNetworkManager { } } - private func multiSessionNetworkStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { + private func multiSessionNetworkStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil, networkOptimizationData: StreamMode.NetworkOptimizationData?) -> AsyncStream { let id = id ?? UUID() let streamMode = streamMode ?? defaultStreamMode() return AsyncStream { continuation in @@ -234,7 +234,7 @@ extension NostrNetworkManager { } do { - for await item in await self.pool.subscribe(filters: filters, to: desiredRelays, id: id) { + for await item in await self.sessionNetworkStreamWithOptimization(filters: filters, to: desiredRelays, id: id, networkOptimizationData: networkOptimizationData) { try Task.checkCancellation() logStreamPipelineStats("RelayPool_Handler_\(id)", "SubscriptionManager_Network_Stream_\(id)") switch item { @@ -264,6 +264,36 @@ extension NostrNetworkManager { } } + /// Stream from the network with some optional optimization + private func sessionNetworkStreamWithOptimization(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, id: UUID? = nil, networkOptimizationData: StreamMode.NetworkOptimizationData?) async -> AsyncStream { + guard let networkOptimizationData else { + // No optimization, just return a regular RelayPool subscription + return await self.pool.subscribe(filters: filters, to: desiredRelays, id: id) + } + switch networkOptimizationData { + case .sinceOptimization(let latestNoteTimestampSeen): + let optimizedFilters = filters.map { + var optimizedFilter = $0 + // Shift the since filter 2 minutes (120 seconds) before the last note timestamp + optimizedFilter.since = latestNoteTimestampSeen > 120 ? latestNoteTimestampSeen - 120 : 0 + return optimizedFilter + } + return await self.pool.subscribe(filters: optimizedFilters, to: desiredRelays, id: id) + case .negentropy(let negentropyStorageVector): + return AsyncStream.with(task: { continuation in + let id = id ?? UUID() + do { + for try await item in try await self.pool.negentropySubscribe(filters: filters, to: desiredRelays, negentropyVector: negentropyStorageVector, id: id, ignoreUnsupportedRelays: true) { + continuation.yield(item) + } + } + catch { + Self.logger.error("Network subscription \(id.uuidString, privacy: .public): Streaming error: \(error.localizedDescription, privacy: .public)") + } + }) + } + } + private func multiSessionNdbStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { return AsyncStream { continuation in let subscriptionId = id ?? UUID() @@ -350,7 +380,8 @@ extension NostrNetworkManager { // MARK: - Utility functions private func defaultStreamMode() -> StreamMode { - self.experimentalLocalRelayModelSupport ? .ndbFirst(optimizeNetworkFilter: false) : .ndbAndNetworkParallel(optimizeNetworkFilter: false) + // Note: Network optimizations disabled by default for now because we need more testing to understand the effects of turning them on by default. + self.experimentalLocalRelayModelSupport ? .ndbFirst(networkOptimization: nil) : .ndbAndNetworkParallel(networkOptimization: nil) } // MARK: - Finding specific data from Nostr @@ -531,22 +562,24 @@ extension NostrNetworkManager { /// The mode of streaming enum StreamMode { /// Returns notes exclusively through NostrDB, treating it as the only channel for information in the pipeline. Generic EOSE is fired when EOSE is received from NostrDB - /// `optimizeNetworkFilter`: Returns notes from ndb, then streams from the network with an added "since" filter set to the latest note stored on ndb. - case ndbFirst(optimizeNetworkFilter: Bool) + case ndbFirst(networkOptimization: NetworkOptimizationStrategy?) /// Returns notes from both NostrDB and the network, in parallel, treating it with similar importance against the network relays. Generic EOSE is fired when EOSE is received from both the network and NostrDB - /// `optimizeNetworkFilter`: Returns notes from ndb, then streams from the network with an added "since" filter set to the latest note stored on ndb. - case ndbAndNetworkParallel(optimizeNetworkFilter: Bool) + case ndbAndNetworkParallel(networkOptimization: NetworkOptimizationStrategy?) /// Ignores the network. case ndbOnly var optimizeNetworkFilter: Bool { + return networkOptimizationStrategy != nil + } + + var networkOptimizationStrategy: NetworkOptimizationStrategy? { switch self { - case .ndbFirst(optimizeNetworkFilter: let optimizeNetworkFilter): - return optimizeNetworkFilter - case .ndbAndNetworkParallel(optimizeNetworkFilter: let optimizeNetworkFilter): - return optimizeNetworkFilter + case .ndbFirst(networkOptimization: let networkOptimization): + return networkOptimization + case .ndbAndNetworkParallel(networkOptimization: let networkOptimization): + return networkOptimization case .ndbOnly: - return false + return nil } } @@ -560,5 +593,31 @@ extension NostrNetworkManager { return false } } + + enum NetworkOptimizationStrategy { + /// Returns notes from ndb, then streams from the network with an added "since" filter set to the latest note stored on ndb. + case sinceOptimization + /// Returns notes from ndb, negentropy syncs missing notes with relays, then streams normally + case negentropy + } + + enum NetworkOptimizationData { + /// Returns notes from ndb, then streams from the network with an added "since" filter set to the latest note stored on ndb. + case sinceOptimization(latestNoteTimestampSeen: UInt32) + /// Returns notes from ndb, negentropy syncs missing notes with relays, then streams normally + case negentropy(negentropyStorageVector: NegentropyStorageVector) + + static func from(strategy: NetworkOptimizationStrategy?, latestNoteTimestampSeen: UInt32?, negentropyStorageVector: NegentropyStorageVector?) -> Self? { + guard let strategy else { return nil } + switch strategy { + case .sinceOptimization: + guard let latestNoteTimestampSeen else { return nil } + return .sinceOptimization(latestNoteTimestampSeen: latestNoteTimestampSeen) + case .negentropy: + guard let negentropyStorageVector else { return nil } + return .negentropy(negentropyStorageVector: negentropyStorageVector) + } + } + } } } diff --git a/damus/Core/Nostr/NostrRequest.swift b/damus/Core/Nostr/NostrRequest.swift index b8f790b5..eb163713 100644 --- a/damus/Core/Nostr/NostrRequest.swift +++ b/damus/Core/Nostr/NostrRequest.swift @@ -81,3 +81,85 @@ enum NostrRequest { } } + +func make_nostr_req(_ req: NostrRequest) -> String? { + switch req { + case .subscribe(let sub): + return make_nostr_subscription_req(sub.filters, sub_id: sub.sub_id) + case .unsubscribe(let sub_id): + return make_nostr_unsubscribe_req(sub_id) + case .event(let ev): + return make_nostr_push_event(ev: ev) + case .auth(let ev): + return make_nostr_auth_event(ev: ev) + case .negentropyOpen(subscriptionId: let subscriptionId, filter: let filter, initialMessage: let initialMessage): + return make_nostr_negentropy_open_req(subscriptionId: subscriptionId, filter: filter, initialMessage: initialMessage) + case .negentropyMessage(subscriptionId: let subscriptionId, message: let message): + return make_nostr_negentropy_message_req(subscriptionId: subscriptionId, message: message) + case .negentropyClose(subscriptionId: let subscriptionId): + return make_nostr_negentropy_close_req(subscriptionId: subscriptionId) + } +} + +func make_nostr_auth_event(ev: NostrEvent) -> String? { + guard let event = encode_json(ev) else { + return nil + } + let encoded = "[\"AUTH\",\(event)]" + print(encoded) + return encoded +} + +func make_nostr_push_event(ev: NostrEvent) -> String? { + guard let event = encode_json(ev) else { + return nil + } + let encoded = "[\"EVENT\",\(event)]" + print(encoded) + return encoded +} + +func make_nostr_unsubscribe_req(_ sub_id: String) -> String? { + "[\"CLOSE\",\"\(sub_id)\"]" +} + +func make_nostr_subscription_req(_ filters: [NostrFilter], sub_id: String) -> String? { + let encoder = JSONEncoder() + var req = "[\"REQ\",\"\(sub_id)\"" + for filter in filters { + req += "," + guard let filter_json = try? encoder.encode(filter) else { + return nil + } + let filter_json_str = String(decoding: filter_json, as: UTF8.self) + req += filter_json_str + } + req += "]" + return req +} + +func make_nostr_negentropy_open_req(subscriptionId: String, filter: NostrFilter, initialMessage: [UInt8]) -> String? { + let encoder = JSONEncoder() + let messageData = Data(initialMessage) + let messageHex = hex_encode(messageData) + var req = "[\"NEG-OPEN\",\"\(subscriptionId)\"," + guard let filter_json = try? encoder.encode(filter) else { + return nil + } + let filter_json_str = String(decoding: filter_json, as: UTF8.self) + req += filter_json_str + req += ",\"\(messageHex)\"" + req += "]" + return req +} + +func make_nostr_negentropy_message_req(subscriptionId: String, message: [UInt8]) -> String? { + let messageData = Data(message) + let messageHex = hex_encode(messageData) + return "[\"NEG-MSG\",\"\(subscriptionId)\",\"\(messageHex)\"]" +} + +func make_nostr_negentropy_close_req(subscriptionId: String) -> String? { + return "[\"NEG-CLOSE\",\"\(subscriptionId)\"]" +} + diff --git a/damus/Core/Nostr/RelayConnection.swift b/damus/Core/Nostr/RelayConnection.swift index 94e1bb33..ecd92655 100644 --- a/damus/Core/Nostr/RelayConnection.swift +++ b/damus/Core/Nostr/RelayConnection.swift @@ -368,84 +368,3 @@ final class RelayConnection: ObservableObject { self.negentropyStreams[id] = nil } } - -func make_nostr_req(_ req: NostrRequest) -> String? { - switch req { - case .subscribe(let sub): - return make_nostr_subscription_req(sub.filters, sub_id: sub.sub_id) - case .unsubscribe(let sub_id): - return make_nostr_unsubscribe_req(sub_id) - case .event(let ev): - return make_nostr_push_event(ev: ev) - case .auth(let ev): - return make_nostr_auth_event(ev: ev) - case .negentropyOpen(subscriptionId: let subscriptionId, filter: let filter, initialMessage: let initialMessage): - return make_nostr_negentropy_open_req(subscriptionId: subscriptionId, filter: filter, initialMessage: initialMessage) - case .negentropyMessage(subscriptionId: let subscriptionId, message: let message): - return make_nostr_negentropy_message_req(subscriptionId: subscriptionId, message: message) - case .negentropyClose(subscriptionId: let subscriptionId): - return make_nostr_negentropy_close_req(subscriptionId: subscriptionId) - } -} - -func make_nostr_auth_event(ev: NostrEvent) -> String? { - guard let event = encode_json(ev) else { - return nil - } - let encoded = "[\"AUTH\",\(event)]" - print(encoded) - return encoded -} - -func make_nostr_push_event(ev: NostrEvent) -> String? { - guard let event = encode_json(ev) else { - return nil - } - let encoded = "[\"EVENT\",\(event)]" - print(encoded) - return encoded -} - -func make_nostr_unsubscribe_req(_ sub_id: String) -> String? { - "[\"CLOSE\",\"\(sub_id)\"]" -} - -func make_nostr_subscription_req(_ filters: [NostrFilter], sub_id: String) -> String? { - let encoder = JSONEncoder() - var req = "[\"REQ\",\"\(sub_id)\"" - for filter in filters { - req += "," - guard let filter_json = try? encoder.encode(filter) else { - return nil - } - let filter_json_str = String(decoding: filter_json, as: UTF8.self) - req += filter_json_str - } - req += "]" - return req -} - -func make_nostr_negentropy_open_req(subscriptionId: String, filter: NostrFilter, initialMessage: [UInt8]) -> String? { - let encoder = JSONEncoder() - let messageData = Data(initialMessage) - let messageHex = hex_encode(messageData) - var req = "[\"NEG-OPEN\",\"\(subscriptionId)\"," - guard let filter_json = try? encoder.encode(filter) else { - return nil - } - let filter_json_str = String(decoding: filter_json, as: UTF8.self) - req += filter_json_str - req += ",\"\(messageHex)\"" - req += "]" - return req -} - -func make_nostr_negentropy_message_req(subscriptionId: String, message: [UInt8]) -> String? { - let messageData = Data(message) - let messageHex = hex_encode(messageData) - return "[\"NEG-MSG\",\"\(subscriptionId)\",\"\(messageHex)\"]" -} - -func make_nostr_negentropy_close_req(subscriptionId: String) -> String? { - return "[\"NEG-CLOSE\",\"\(subscriptionId)\"]" -} diff --git a/damus/Features/Timeline/Models/HomeModel.swift b/damus/Features/Timeline/Models/HomeModel.swift index 9928b262..f86c25ef 100644 --- a/damus/Features/Timeline/Models/HomeModel.swift +++ b/damus/Features/Timeline/Models/HomeModel.swift @@ -584,7 +584,7 @@ class HomeModel: ContactsDelegate, ObservableObject { // fixing the race condition where onAppear fires before events arrive. for await item in damus_state.nostrNetwork.reader.advancedStream( filters: notifications_filters, - streamMode: .ndbAndNetworkParallel(optimizeNetworkFilter: true) + streamMode: .ndbAndNetworkParallel(networkOptimization: .sinceOptimization) ) { switch item { case .event(let lender): @@ -605,7 +605,7 @@ class HomeModel: ContactsDelegate, ObservableObject { } self.generalHandlerTask?.cancel() self.generalHandlerTask = Task { - for await item in damus_state.nostrNetwork.reader.advancedStream(filters: dms_filters + contacts_filters, streamMode: .ndbAndNetworkParallel(optimizeNetworkFilter: true)) { + for await item in damus_state.nostrNetwork.reader.advancedStream(filters: dms_filters + contacts_filters, streamMode: .ndbAndNetworkParallel(networkOptimization: .sinceOptimization)) { switch item { case .event(let lender): await lender.justUseACopy({ await process_event(ev: $0, context: .other) }) @@ -706,7 +706,7 @@ class HomeModel: ContactsDelegate, ObservableObject { DispatchQueue.main.async { self.loading = true } - for await item in damus_state.nostrNetwork.reader.advancedStream(filters: home_filters, streamMode: .ndbAndNetworkParallel(optimizeNetworkFilter: true), id: id) { + for await item in damus_state.nostrNetwork.reader.advancedStream(filters: home_filters, streamMode: .ndbAndNetworkParallel(networkOptimization: .sinceOptimization), id: id) { switch item { case .event(let lender): let currentTime = CFAbsoluteTimeGetCurrent() diff --git a/damus/Shared/Utilities/NegentropyUtilities.swift b/damus/Shared/Utilities/NegentropyUtilities.swift index 2a4c7b35..dd3545de 100644 --- a/damus/Shared/Utilities/NegentropyUtilities.swift +++ b/damus/Shared/Utilities/NegentropyUtilities.swift @@ -11,4 +11,18 @@ extension NegentropyStorageVector { func insert(nostrEvent: NostrEvent) throws { try self.insert(timestamp: UInt64(nostrEvent.created_at), id: Id(data: nostrEvent.id.id)) } + + func insert(nostrEvent: borrowing UnownedNdbNote) throws { + try self.insert(timestamp: UInt64(nostrEvent.createdAt), id: Id(data: nostrEvent.id.id)) + } + + func unsealAndInsert(nostrEvent: NostrEvent) { + self.unseal() + try? self.insert(nostrEvent: nostrEvent) + } + + func unsealAndInsert(nostrEvent: borrowing UnownedNdbNote) { + self.unseal() + try? self.insert(nostrEvent: nostrEvent) + } } diff --git a/damusTests/SubscriptionManagerNegentropyTests.swift b/damusTests/SubscriptionManagerNegentropyTests.swift new file mode 100644 index 00000000..56e66922 --- /dev/null +++ b/damusTests/SubscriptionManagerNegentropyTests.swift @@ -0,0 +1,494 @@ +// +// SubscriptionManagerNegentropyTests.swift +// damus +// +// Created by Daniel D'Aquino on 2026-01-17. +// + +import XCTest +import NostrSDK +import Negentropy +@testable import damus + +/// Tests for the SubscriptionManager's negentropy streaming mode functionality. +/// +/// These tests verify that SubscriptionManager correctly handles negentropy-based synchronization +/// by streaming events from NostrDB first, then efficiently syncing missing events from relays +/// using the negentropy protocol. +final class SubscriptionManagerNegentropyTests: XCTestCase { + + // MARK: - Helper Functions + + /// Creates and runs a local relay on the specified port. + /// - Parameter port: The port number to run the relay on + /// - Returns: The running LocalRelay instance + private func setupRelay(port: UInt16) async throws -> LocalRelay { + let builder = RelayBuilder().port(port: port) + let relay = LocalRelay(builder: builder) + try await relay.run() + print("Relay url: \(await relay.url())") + return relay + } + + /// Connects to a relay and waits for the connection to be established. + /// - Parameters: + /// - url: The relay URL to connect to + /// - label: Optional label for logging (e.g., "Relay1", "Relay2") + /// - Returns: The connected RelayConnection instance + private func connectToRelay(url: RelayURL, label: String = "") async -> RelayConnection { + var connectionContinuation: CheckedContinuation? + + let relayConnection = RelayConnection(url: url, handleEvent: { _ in }, processUnverifiedWSEvent: { wsEvent in + let prefix = label.isEmpty ? "" : "(\(label)) " + switch wsEvent { + case .connected: + connectionContinuation?.resume() + case .message(let message): + print("SUBSCRIPTION_MANAGER_NEGENTROPY_TEST \(prefix): Received: \(message)") + case .disconnected(let closeCode, let string): + print("SUBSCRIPTION_MANAGER_NEGENTROPY_TEST \(prefix): Disconnected: \(closeCode); \(String(describing: string))") + case .error(let error): + print("SUBSCRIPTION_MANAGER_NEGENTROPY_TEST \(prefix): Received error: \(error)") + } + }) + relayConnection.connect() + + // Wait for connection to be established + await withCheckedContinuation { continuation in + connectionContinuation = continuation + } + + return relayConnection + } + + /// Sends events to a relay connection. + /// - Parameters: + /// - events: Array of NostrEvent to send + /// - connection: The RelayConnection to send events through + private func sendEvents(_ events: [NostrEvent], to connection: RelayConnection) { + for event in events { + connection.send(.typical(.event(event))) + } + } + + /// Sets up a NostrNetworkManager with the specified relay URLs. + /// - Parameters: + /// - urls: Array of RelayURL to add to the manager + /// - ndb: The Ndb instance to use + /// - Returns: Configured and connected NostrNetworkManager + private func setupNetworkManager(with urls: [RelayURL], ndb: Ndb) async throws -> NostrNetworkManager { + let delegate = TestNetworkDelegate(ndb: ndb, keypair: test_keypair, bootstrapRelays: urls) + let networkManager = NostrNetworkManager(delegate: delegate, addNdbToRelayPool: true) + + // Manually add relays to the pool since we're bypassing normal initialization + for url in urls { + do { + try await networkManager.userRelayList.insert(relay: .init(url: url, rwConfiguration: .readWrite), force: true) + } + catch { + switch error { + case .relayAlreadyExists: continue + default: throw error + } + } + } + + await networkManager.userRelayList.connect() + // Wait for relay pool to be ready. + // It's generally not a good idea to hard code delays but RelayPool does not seem to provide any way to await for the connection to fully go through, + // or that mechanism is not well documented. + try await Task.sleep(for: .seconds(2)) + + return networkManager + } + + /// Stores events in NostrDB for testing purposes. + /// - Parameters: + /// - events: Array of NostrEvent to store in NDB + /// - ndb: The Ndb instance to store events in + private func storeEventsInNdb(_ events: [NostrEvent], ndb: Ndb) { + for event in events { + do { + try ndb.add(event: event) + } catch { + XCTFail("Failed to store event in NDB: \(error)") + } + } + } + + /// Runs a subscription manager stream and fulfills expectations based on received events and EOSE signals. + /// - Parameters: + /// - networkManager: The network manager to subscribe through + /// - filters: The NostrFilters to apply + /// - streamMode: The stream mode to use for subscription + /// - ndbEventExpectations: Dictionary mapping event IDs to expectations for events from NDB + /// - negentropyEventExpectations: Dictionary mapping event IDs to expectations for events from negentropy + /// - ndbEoseExpectation: Optional expectation to fulfill when NDB EOSE is received + /// - networkEoseExpectation: Optional expectation to fulfill when network EOSE is received + /// - eoseExpectation: Optional expectation to fulfill when final EOSE is received + private func runAdvancedStream( + networkManager: NostrNetworkManager, + filters: [NostrFilter], + streamMode: NostrNetworkManager.StreamMode, + ndbEventExpectations: [NoteId: XCTestExpectation], + negentropyEventExpectations: [NoteId: XCTestExpectation], + ndbEoseExpectation: XCTestExpectation? = nil, + networkEoseExpectation: XCTestExpectation? = nil, + eoseExpectation: XCTestExpectation? = nil + ) { + Task { + var ndbEoseSeen = false + + for await item in networkManager.reader.advancedStream(filters: filters, streamMode: streamMode) { + switch item { + case .event(let lender): + try? lender.borrow { event in + // Check if this event came before or after NDB EOSE + if !ndbEoseSeen { + // Event came from NDB - verify it's expected from NDB + if negentropyEventExpectations[event.id] != nil { + XCTFail("Event \(event.id) arrived from NDB (before ndbEose) but was expected from negentropy (after ndbEose). This indicates incorrect streaming behavior.") + } + + if let expectation = ndbEventExpectations[event.id] { + expectation.fulfill() + } + } else { + // Event came from negentropy sync (after NDB EOSE) - verify it's expected from negentropy + if ndbEventExpectations[event.id] != nil { + XCTFail("Event \(event.id) arrived from negentropy (after ndbEose) but was expected from NDB (before ndbEose). This indicates incorrect streaming behavior.") + } + + if let expectation = negentropyEventExpectations[event.id] { + expectation.fulfill() + } + } + } + case .ndbEose: + ndbEoseSeen = true + ndbEoseExpectation?.fulfill() + case .networkEose: + networkEoseExpectation?.fulfill() + case .eose: + eoseExpectation?.fulfill() + return + } + } + } + } + + // MARK: - Test Cases + + /// Test basic negentropy streaming where local NDB has one event and relay has two events. + /// Should stream noteA from NDB first, then sync noteB via negentropy from the relay. + func testBasicNegentropyStreaming() async throws { + // Given: A relay with noteA and noteB, and local NDB has noteA + let relay = try await setupRelay(port: 9080) + let relayUrl = RelayURL(await relay.url().description)! + + let noteA = NostrEvent(content: "A", keypair: test_keypair)! + let noteB = NostrEvent(content: "B", keypair: test_keypair)! + + let relayConnection = await connectToRelay(url: relayUrl) + sendEvents([noteA, noteB], to: relayConnection) + + let ndb = await test_damus_state.ndb + storeEventsInNdb([noteA], ndb: ndb) + + let networkManager = try await setupNetworkManager(with: [relayUrl], ndb: ndb) + + let getsNoteAFromNdb = XCTestExpectation(description: "Gets note A from NDB before ndbEose") + let getsNoteBFromNegentropy = XCTestExpectation(description: "Gets note B via negentropy after ndbEose") + let ndbEose = XCTestExpectation(description: "Receives NDB EOSE") + let networkEose = XCTestExpectation(description: "Receives network EOSE") + + // When: Using negentropy streaming mode + runAdvancedStream( + networkManager: networkManager, + filters: [NostrFilter(kinds: [.text])], + streamMode: .ndbAndNetworkParallel(networkOptimization: .negentropy), + ndbEventExpectations: [noteA.id: getsNoteAFromNdb], + negentropyEventExpectations: [noteB.id: getsNoteBFromNegentropy], + ndbEoseExpectation: ndbEose, + networkEoseExpectation: networkEose + ) + + // Then: Should receive noteA from NDB, then ndbEose, then noteB via negentropy, then networkEose + await fulfillment(of: [getsNoteAFromNdb, ndbEose, getsNoteBFromNegentropy, networkEose], timeout: 10.0, enforceOrder: true) + } + + /// Test negentropy streaming with empty local storage. + /// Should sync all events from the relay via negentropy. + func testEmptyLocalStorageNegentropySync() async throws { + // Given: A relay with noteA and noteB, and empty local NDB + let relay = try await setupRelay(port: 9081) + let relayUrl = RelayURL(await relay.url().description)! + + let noteA = NostrEvent(content: "A", keypair: test_keypair)! + let noteB = NostrEvent(content: "B", keypair: test_keypair)! + + let relayConnection = await connectToRelay(url: relayUrl) + sendEvents([noteA, noteB], to: relayConnection) + + let ndb = await test_damus_state.ndb + // Note: Not storing any events in NDB - testing empty local storage + + let networkManager = try await setupNetworkManager(with: [relayUrl], ndb: ndb) + + let getsNoteAFromNegentropy = XCTestExpectation(description: "Gets note A via negentropy after ndbEose") + let getsNoteBFromNegentropy = XCTestExpectation(description: "Gets note B via negentropy after ndbEose") + let ndbEose = XCTestExpectation(description: "Receives NDB EOSE") + let networkEose = XCTestExpectation(description: "Receives network EOSE") + + // When: Using negentropy streaming mode with empty local storage + runAdvancedStream( + networkManager: networkManager, + filters: [NostrFilter(kinds: [.text])], + streamMode: .ndbAndNetworkParallel(networkOptimization: .negentropy), + ndbEventExpectations: [:], + negentropyEventExpectations: [noteA.id: getsNoteAFromNegentropy, noteB.id: getsNoteBFromNegentropy], + ndbEoseExpectation: ndbEose, + networkEoseExpectation: networkEose + ) + + // Then: Should receive ndbEose first, then all events via negentropy, then networkEose + // (Order not enforced because we don't make guarantees on the order of A/B) + await fulfillment(of: [ndbEose, getsNoteAFromNegentropy, getsNoteBFromNegentropy, networkEose], timeout: 10.0) + } + + /// Test negentropy streaming when all events are already synced locally. + /// Should stream events from NDB only, without syncing from relays. + func testAllEventsSyncedNegentropyMode() async throws { + // Given: A relay with noteA and noteB, and local NDB has both events + let relay = try await setupRelay(port: 9082) + let relayUrl = RelayURL(await relay.url().description)! + + let noteA = NostrEvent(content: "A", keypair: test_keypair)! + let noteB = NostrEvent(content: "B", keypair: test_keypair)! + + let relayConnection = await connectToRelay(url: relayUrl) + sendEvents([noteA, noteB], to: relayConnection) + + let ndb = await test_damus_state.ndb + storeEventsInNdb([noteA, noteB], ndb: ndb) + + let networkManager = try await setupNetworkManager(with: [relayUrl], ndb: ndb) + + let getsNoteAFromNdb = XCTestExpectation(description: "Gets note A from NDB before ndbEose") + let getsNoteBFromNdb = XCTestExpectation(description: "Gets note B from NDB before ndbEose") + let ndbEose = XCTestExpectation(description: "Receives NDB EOSE") + let networkEose = XCTestExpectation(description: "Receives network EOSE") + + // When: Using negentropy streaming mode with all events already synced + runAdvancedStream( + networkManager: networkManager, + filters: [NostrFilter(kinds: [.text])], + streamMode: .ndbAndNetworkParallel(networkOptimization: .negentropy), + ndbEventExpectations: [noteA.id: getsNoteAFromNdb, noteB.id: getsNoteBFromNdb], + negentropyEventExpectations: [:], + ndbEoseExpectation: ndbEose, + networkEoseExpectation: networkEose + ) + + // Then: Should receive all events from NDB before ndbEose, then networkEose (no new events to sync) + // (Order not enforced because we don't make guarantees on the order of A/B) + await fulfillment(of: [getsNoteAFromNdb, getsNoteBFromNdb, ndbEose, networkEose], timeout: 10.0) + } + + /// Test negentropy streaming with two relays having overlapping events. + /// Relay1 has noteA+noteB, Relay2 has noteB+noteC, local NDB has noteB. + /// Should stream noteB from NDB, then sync noteA and noteC via negentropy (deduplicating noteB). + func testTwoRelaysWithOverlapNegentropySync() async throws { + // Given: Two relays with overlapping events and local NDB has noteB + let relay1 = try await setupRelay(port: 9083) + let relay2 = try await setupRelay(port: 9084) + + let relayUrl1 = RelayURL(await relay1.url().description)! + let relayUrl2 = RelayURL(await relay2.url().description)! + + let noteA = NostrEvent(content: "A", keypair: test_keypair)! + let noteB = NostrEvent(content: "B", keypair: test_keypair)! + let noteC = NostrEvent(content: "C", keypair: test_keypair)! + + // Connect to relay1 and send noteA + noteB + let relayConnection1 = await connectToRelay(url: relayUrl1, label: "Relay1") + sendEvents([noteA, noteB], to: relayConnection1) + + // Connect to relay2 and send noteB + noteC + let relayConnection2 = await connectToRelay(url: relayUrl2, label: "Relay2") + sendEvents([noteB, noteC], to: relayConnection2) + + let ndb = await test_damus_state.ndb + storeEventsInNdb([noteB], ndb: ndb) + + let networkManager = try await setupNetworkManager(with: [relayUrl1, relayUrl2], ndb: ndb) + + let getsNoteBFromNdb = XCTestExpectation(description: "Gets note B from NDB before ndbEose") + let getsNoteAFromNegentropy = XCTestExpectation(description: "Gets note A via negentropy after ndbEose") + let getsNoteCFromNegentropy = XCTestExpectation(description: "Gets note C via negentropy after ndbEose") + let ndbEose = XCTestExpectation(description: "Receives NDB EOSE") + let networkEose = XCTestExpectation(description: "Receives network EOSE") + + // When: Using negentropy streaming mode across two relays + runAdvancedStream( + networkManager: networkManager, + filters: [NostrFilter(kinds: [.text])], + streamMode: .ndbAndNetworkParallel(networkOptimization: .negentropy), + ndbEventExpectations: [noteB.id: getsNoteBFromNdb], + negentropyEventExpectations: [noteA.id: getsNoteAFromNegentropy, noteC.id: getsNoteCFromNegentropy], + ndbEoseExpectation: ndbEose, + networkEoseExpectation: networkEose + ) + + // Then: Should receive noteB from NDB, then ndbEose, then noteA and noteC via negentropy + // (Order not enforced because we don't make guarantees on the order of A/C) + await fulfillment(of: [getsNoteBFromNdb, ndbEose, getsNoteAFromNegentropy, getsNoteCFromNegentropy, networkEose], timeout: 10.0) + } + + /// Test negentropy streaming with three relays having different overlapping patterns. + /// Relay1 has A+B, Relay2 has B+C, Relay3 has C+D, local NDB has A+C. + /// Should stream A and C from NDB, then sync B and D via negentropy. + func testThreeRelaysPartialSyncNegentropy() async throws { + // Given: Three relays with overlapping events and local NDB has noteA and noteC + let relay1 = try await setupRelay(port: 9085) + let relay2 = try await setupRelay(port: 9086) + let relay3 = try await setupRelay(port: 9087) + + let relayUrl1 = RelayURL(await relay1.url().description)! + let relayUrl2 = RelayURL(await relay2.url().description)! + let relayUrl3 = RelayURL(await relay3.url().description)! + + let noteA = NostrEvent(content: "A", keypair: test_keypair)! + let noteB = NostrEvent(content: "B", keypair: test_keypair)! + let noteC = NostrEvent(content: "C", keypair: test_keypair)! + let noteD = NostrEvent(content: "D", keypair: test_keypair)! + + // Connect to relay1 and send noteA + noteB + let relayConnection1 = await connectToRelay(url: relayUrl1, label: "Relay1") + sendEvents([noteA, noteB], to: relayConnection1) + + // Connect to relay2 and send noteB + noteC + let relayConnection2 = await connectToRelay(url: relayUrl2, label: "Relay2") + sendEvents([noteB, noteC], to: relayConnection2) + + // Connect to relay3 and send noteC + noteD + let relayConnection3 = await connectToRelay(url: relayUrl3, label: "Relay3") + sendEvents([noteC, noteD], to: relayConnection3) + + let ndb = await test_damus_state.ndb + storeEventsInNdb([noteA, noteC], ndb: ndb) + + let networkManager = try await setupNetworkManager(with: [relayUrl1, relayUrl2, relayUrl3], ndb: ndb) + + let getsNoteAFromNdb = XCTestExpectation(description: "Gets note A from NDB before ndbEose") + let getsNoteCFromNdb = XCTestExpectation(description: "Gets note C from NDB before ndbEose") + let getsNoteBFromNegentropy = XCTestExpectation(description: "Gets note B via negentropy after ndbEose") + let getsNoteDFromNegentropy = XCTestExpectation(description: "Gets note D via negentropy after ndbEose") + let ndbEose = XCTestExpectation(description: "Receives NDB EOSE") + let networkEose = XCTestExpectation(description: "Receives network EOSE") + + // When: Using negentropy streaming mode across three relays with partial overlap + runAdvancedStream( + networkManager: networkManager, + filters: [NostrFilter(kinds: [.text])], + streamMode: .ndbAndNetworkParallel(networkOptimization: .negentropy), + ndbEventExpectations: [noteA.id: getsNoteAFromNdb, noteC.id: getsNoteCFromNdb], + negentropyEventExpectations: [noteB.id: getsNoteBFromNegentropy, noteD.id: getsNoteDFromNegentropy], + ndbEoseExpectation: ndbEose, + networkEoseExpectation: networkEose + ) + + // Then: Should receive A and C from NDB, then ndbEose, then B and D via negentropy + // (Order not enforced because we don't make guarantees on the order of A/C and B/D) + await fulfillment(of: [getsNoteAFromNdb, getsNoteCFromNdb, ndbEose, getsNoteBFromNegentropy, getsNoteDFromNegentropy, networkEose], timeout: 10.0) + } + + /// Test negentropy streaming with multiple filters for different event kinds. + /// Relay1 has text notes A+B (kind 1), Relay2 has text B + DM C (kind 4), Relay3 has DMs C+D (kind 4). + /// Local NDB has text note A (kind 1) and DM C (kind 4). + /// Should stream A and C from NDB, then sync B and D via negentropy. + func testMultipleFiltersWithDifferentKindsNegentropy() async throws { + // Given: Three relays with mixed event kinds and local NDB has text note A and DM C + let relay1 = try await setupRelay(port: 9089) + let relay2 = try await setupRelay(port: 9090) + let relay3 = try await setupRelay(port: 9091) + + let relayUrl1 = RelayURL(await relay1.url().description)! + let relayUrl2 = RelayURL(await relay2.url().description)! + let relayUrl3 = RelayURL(await relay3.url().description)! + + // Create events with different kinds + // kind 1 = text notes, kind 4 = encrypted DMs + let noteA = NostrEvent(content: "A", keypair: test_keypair, kind: 1)! // text note + let noteB = NostrEvent(content: "B", keypair: test_keypair, kind: 1)! // text note + let noteC = NostrEvent(content: "C", keypair: test_keypair, kind: 4)! // DM + let noteD = NostrEvent(content: "D", keypair: test_keypair, kind: 4)! // DM + + // Connect to relay1 and send text notes A + B + let relayConnection1 = await connectToRelay(url: relayUrl1, label: "Relay1") + sendEvents([noteA, noteB], to: relayConnection1) + + // Connect to relay2 and send text note B + DM C + let relayConnection2 = await connectToRelay(url: relayUrl2, label: "Relay2") + sendEvents([noteB, noteC], to: relayConnection2) + + // Connect to relay3 and send DMs C + D + let relayConnection3 = await connectToRelay(url: relayUrl3, label: "Relay3") + sendEvents([noteC, noteD], to: relayConnection3) + + let ndb = await test_damus_state.ndb + storeEventsInNdb([noteA, noteC], ndb: ndb) + + let networkManager = try await setupNetworkManager(with: [relayUrl1, relayUrl2, relayUrl3], ndb: ndb) + + let getsNoteAFromNdb = XCTestExpectation(description: "Gets text note A from NDB before ndbEose") + let getsNoteCFromNdb = XCTestExpectation(description: "Gets DM C from NDB before ndbEose") + let getsNoteBFromNegentropy = XCTestExpectation(description: "Gets text note B via negentropy after ndbEose") + let getsNoteDFromNegentropy = XCTestExpectation(description: "Gets DM D via negentropy after ndbEose") + let ndbEose = XCTestExpectation(description: "Receives NDB EOSE") + let networkEose = XCTestExpectation(description: "Receives network EOSE") + + // When: Using negentropy streaming with multiple filters for different kinds + // Use two filters: one for kind 1 (text), one for kind 4 (DMs) + runAdvancedStream( + networkManager: networkManager, + filters: [ + NostrFilter(kinds: [.text]), // kind 1 + NostrFilter(kinds: [.dm]) // kind 4 + ], + streamMode: .ndbAndNetworkParallel(networkOptimization: .negentropy), + ndbEventExpectations: [noteA.id: getsNoteAFromNdb, noteC.id: getsNoteCFromNdb], + negentropyEventExpectations: [noteB.id: getsNoteBFromNegentropy, noteD.id: getsNoteDFromNegentropy], + ndbEoseExpectation: ndbEose, + networkEoseExpectation: networkEose + ) + + // Then: Should receive A and C from NDB, then ndbEose, then B and D via negentropy. + // (Order not enforced because we don't make guarantees on the order of A/C and B/D + await fulfillment(of: [getsNoteAFromNdb, getsNoteCFromNdb, ndbEose, getsNoteBFromNegentropy, getsNoteDFromNegentropy, networkEose], timeout: 10.0) + } +} + +// MARK: - Test Doubles + +/// Test delegate for NostrNetworkManager that provides minimal configuration for testing +private final class TestNetworkDelegate: NostrNetworkManager.Delegate { + var ndb: Ndb + var keypair: Keypair + var latestRelayListEventIdHex: String? + var latestContactListEvent: NostrEvent? + var bootstrapRelays: [RelayURL] + var developerMode: Bool = false + var experimentalLocalRelayModelSupport: Bool = false + var relayModelCache: RelayModelCache + var relayFilters: RelayFilters + var nwcWallet: WalletConnectURL? + + init(ndb: Ndb, keypair: Keypair, bootstrapRelays: [RelayURL]) { + self.ndb = ndb + self.keypair = keypair + self.bootstrapRelays = bootstrapRelays + self.relayModelCache = RelayModelCache() + self.relayFilters = RelayFilters(our_pubkey: keypair.pubkey) + } +} diff --git a/nostrdb/Ndb+.swift b/nostrdb/Ndb+.swift index ed901e0f..a8f116ba 100644 --- a/nostrdb/Ndb+.swift +++ b/nostrdb/Ndb+.swift @@ -36,4 +36,22 @@ extension Ndb { func processEvent(_ str: String, originRelayURL: RelayURL? = nil) -> Bool { self.process_event(str, originRelayURL: originRelayURL?.absoluteString) } + + /// Adds a NostrEvent to the database by converting it to a push event and processing it. + /// - Parameter event: The NostrEvent to add + /// - Throws: NdbAddError.couldNotMakePushEvent if the event cannot be converted, or NdbAddError.processingFailed if processing fails + func add(event: NostrEvent) throws { + guard let nostrPushEvent = make_nostr_push_event(ev: event) else { + throw NdbAddError.couldNotMakePushEvent + } + let success = self.process_client_event(nostrPushEvent) + if !success { + throw NdbAddError.processingFailed + } + } + + enum NdbAddError: Error { + case couldNotMakePushEvent + case processingFailed + } }