Implement initial negentropy base functions

This implements some useful functions to use negentropy from RelayPool,
but does not integrate them with the rest of the app.

No changelog for the negentropy support right now as it is not hooked up
to any user-facing feature

Changelog-Fixed: Fixed a race condition in the networking logic that could cause notes to get missed in certain rare scenarios
Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2026-01-12 12:47:06 -08:00
parent ac05b83772
commit 95d38fa802
14 changed files with 1034 additions and 24 deletions

View File

@@ -331,11 +331,11 @@ func sign_id(privkey: String, id: String) -> String {
}
func decode_nostr_event(txt: String) -> NostrResponse? {
return NostrResponse.owned_from_json(json: txt)
return NostrResponse.decode(from: txt)
}
func decode_and_verify_nostr_response(txt: String) -> NostrResponse? {
guard let response = NostrResponse.owned_from_json(json: txt) else { return nil }
guard let response = NostrResponse.decode(from: txt) else { return nil }
guard verify_nostr_response(response: response) == true else { return nil }
return response
}
@@ -352,6 +352,10 @@ func verify_nostr_response(response: borrowing NostrResponse) -> Bool {
return true
case .auth(_):
return true
case .negentropyError(subscriptionId: let subscriptionId, reasonCodeString: let reasonCodeString):
return true
case .negentropyMessage(subscriptionId: let subscriptionId, hexEncodedData: let hexEncodedData):
return true
}
}

View File

@@ -48,6 +48,12 @@ enum NostrRequest {
case event(NostrEvent)
/// Authenticate with the relay
case auth(NostrEvent)
/// Negentropy open
case negentropyOpen(subscriptionId: String, filter: NostrFilter, initialMessage: [UInt8])
/// Negentropy message
case negentropyMessage(subscriptionId: String, message: [UInt8])
/// Close negentropy communication
case negentropyClose(subscriptionId: String)
/// Whether this request is meant to write data to a relay
var is_write: Bool {
@@ -60,6 +66,12 @@ enum NostrRequest {
return true
case .auth:
return false
case .negentropyOpen:
return false
case .negentropyMessage:
return false
case .negentropyClose:
return false
}
}

View File

@@ -18,6 +18,23 @@ enum MaybeResponse {
case ok(NostrResponse)
}
enum NegentropyResponse {
/// Negentropy error
case error(subscriptionId: String, reasonCodeString: String)
/// Negentropy message
case message(subscriptionId: String, data: [UInt8])
/// Invalid negentropy message
case invalidResponse(subscriptionId: String)
var subscriptionId: String {
switch self {
case .error(subscriptionId: let subscriptionId, reasonCodeString: let reasonCodeString): subscriptionId
case .message(subscriptionId: let subscriptionId, data: let data): subscriptionId
case .invalidResponse(subscriptionId: let subscriptionId): subscriptionId
}
}
}
enum NostrResponse {
case event(String, NostrEvent)
case notice(String)
@@ -27,6 +44,10 @@ enum NostrResponse {
///
/// The associated type of this case is the challenge string sent by the server.
case auth(String)
/// Negentropy error
case negentropyError(subscriptionId: String, reasonCodeString: String)
/// Negentropy message
case negentropyMessage(subscriptionId: String, hexEncodedData: String)
var subid: String? {
switch self {
@@ -36,14 +57,84 @@ enum NostrResponse {
return sub_id
case .eose(let sub_id):
return sub_id
case .notice:
case .notice(_):
return nil
case .auth(let challenge_string):
return challenge_string
case .negentropyError(subscriptionId: let subscriptionId, reasonCodeString: _):
return subscriptionId
case .negentropyMessage(subscriptionId: let subscriptionId, hexEncodedData: _):
return subscriptionId
}
}
var negentropyResponse: NegentropyResponse? {
switch self {
case .event(_, _): return nil
case .notice(_): return nil
case .eose(_): return nil
case .ok(_): return nil
case .auth(_): return nil
case .negentropyError(subscriptionId: let subscriptionId, reasonCodeString: let reasonCodeString):
return .error(subscriptionId: subscriptionId, reasonCodeString: reasonCodeString)
case .negentropyMessage(subscriptionId: let subscriptionId, hexEncodedData: let hexData):
if let bytes = hex_decode(hexData) {
return .message(subscriptionId: subscriptionId, data: bytes)
}
return .invalidResponse(subscriptionId: subscriptionId)
}
}
/// Decode a Nostr response from JSON using idiomatic Swift parsing
/// Supports NEG-MSG and NEG-ERR formats, falling back to C parsing for other message types
static func decode(from json: String) -> NostrResponse? {
// Try Swift-based parsing first for negentropy messages
if let response = try? decodeNegentropyMessage(from: json) {
return response
}
// Fall back to C-based parsing for standard Nostr messages
return owned_from_json(json: json)
}
/// Decode negentropy messages using idiomatic Swift
private static func decodeNegentropyMessage(from json: String) throws -> NostrResponse? {
guard let jsonData = json.data(using: .utf8) else {
return nil
}
guard let jsonArray = try JSONSerialization.jsonObject(with: jsonData) as? [Any],
jsonArray.count >= 2,
let messageType = jsonArray[0] as? String else {
return nil
}
switch messageType {
case "NEG-MSG":
// Format: ["NEG-MSG", "subscription-id", "hex-encoded-data"]
guard jsonArray.count == 3,
let subscriptionId = jsonArray[1] as? String,
let hexData = jsonArray[2] as? String else {
return nil
}
return .negentropyMessage(subscriptionId: subscriptionId, hexEncodedData: hexData)
case "NEG-ERR":
// Format: ["NEG-ERR", "subscription-id", "reason-code"]
guard jsonArray.count == 3,
let subscriptionId = jsonArray[1] as? String,
let reasonCode = jsonArray[2] as? String else {
return nil
}
return .negentropyError(subscriptionId: subscriptionId, reasonCodeString: reasonCode)
default:
// Not a negentropy message
return nil
}
}
static func owned_from_json(json: String) -> NostrResponse? {
private static func owned_from_json(json: String) -> NostrResponse? {
return json.withCString{ cstr in
let bufsize: Int = max(Int(Double(json.utf8.count) * 8.0), Int(getpagesize()))
let data = malloc(bufsize)

View File

@@ -139,6 +139,11 @@ struct RelayMetadata: Codable {
var is_paid: Bool {
return limitation?.payment_required ?? false
}
var supports_negentropy: Bool? {
// Supports negentropy if NIP-77 is in the list of supported NIPs
supported_nips?.contains(where: { $0 == 77 })
}
}
extension RelayPool {

View File

@@ -7,6 +7,7 @@
import Combine
import Foundation
import Negentropy
enum NostrConnectionEvent {
/// Other non-message websocket events
@@ -61,6 +62,16 @@ final class RelayConnection: ObservableObject {
private var processEvent: (WebSocketEvent) -> ()
private let relay_url: RelayURL
var log: RelayLog?
/// The queue of WebSocket events to be processed
/// We need this queue to ensure events are processed and sent to RelayPool in the exact order in which they arrive.
/// See `processEventsTask()` for more information
var wsEventQueue: QueueableNotify<WebSocketEvent>
/// The task which will process WebSocket events in the order in which we receive them from the wire
var wsEventProcessTask: Task<Void, any Error>?
@RelayPoolActor // Isolate this to a specific actor to avoid thread-satefy issues.
var negentropyStreams: [String: AsyncStream<NegentropyResponse>.Continuation] = [:]
init(url: RelayURL,
handleEvent: @escaping (NostrConnectionEvent) async -> (),
@@ -69,6 +80,33 @@ final class RelayConnection: ObservableObject {
self.relay_url = url
self.handleEvent = handleEvent
self.processEvent = processUnverifiedWSEvent
self.wsEventQueue = .init(maxQueueItems: 1000)
self.wsEventProcessTask = nil
self.wsEventProcessTask = Task {
try await self.processEventsTask()
}
}
deinit {
self.wsEventProcessTask?.cancel()
}
/// The task that will stream the queue of WebSocket events to be processed
/// We need this in order to ensure events are processed and sent to RelayPool in the exact order in which they arrive.
///
/// We need this (or some equivalent syncing mechanism) because without it, two WebSocket events can be processed concurrently,
/// and sometimes sent in the wrong order due to difference in processing timing.
///
/// For example, streaming a filter that yields 1 event can cause the EOSE signal to arrive in RelayPool before the event, simply because the event
/// takes longer to process compared to the EOSE signal.
///
/// To prevent this, we send raw WebSocket events to this queue BEFORE any processing (to ensure equal timing),
/// and then process the queue in the order in which they appear
func processEventsTask() async throws {
for await item in await self.wsEventQueue.stream {
try Task.checkCancellation()
await self.receive(event: item)
}
}
func ping() {
@@ -104,12 +142,12 @@ final class RelayConnection: ObservableObject {
.sink { [weak self] completion in
switch completion {
case .failure(let error):
Task { await self?.receive(event: .error(error)) }
Task { await self?.wsEventQueue.add(item: .error(error)) }
case .finished:
Task { await self?.receive(event: .disconnected(.normalClosure, nil)) }
Task { await self?.wsEventQueue.add(item: .disconnected(.normalClosure, nil)) }
}
} receiveValue: { [weak self] event in
Task { await self?.receive(event: event) }
Task { await self?.wsEventQueue.add(item: event) }
}
socket.connect()
@@ -227,6 +265,9 @@ final class RelayConnection: ObservableObject {
// we will not need to verify nostr events at this point.
if let ev = decode_and_verify_nostr_response(txt: messageString) {
await self.handleEvent(.nostr_event(ev))
if let negentropyResponse = ev.negentropyResponse {
await self.negentropyStreams[negentropyResponse.subscriptionId]?.yield(negentropyResponse)
}
return
}
print("failed to decode event \(messageString)")
@@ -238,6 +279,94 @@ final class RelayConnection: ObservableObject {
print("An unexpected URLSessionWebSocketTask.Message was received.")
}
}
// MARK: - Negentropy logic
/// Retrieves the IDs of events missing locally compared to the relay using negentropy protocol.
///
/// - Parameters:
/// - filter: The Nostr filter to scope the sync
/// - negentropyVector: The local storage vector for comparison
/// - timeout: Optional timeout for the operation
/// - Returns: Array of IDs that the relay has but we don't
/// - Throws: NegentropySyncError on failure
@RelayPoolActor
func getMissingIds(filter: NostrFilter, negentropyVector: NegentropyStorageVector, timeout: Duration?) async throws -> [Id] {
if let relayMetadata = try? await fetch_relay_metadata(relay_id: self.relay_url),
let supportsNegentropy = relayMetadata.supports_negentropy {
if !supportsNegentropy {
// Throw an error if the relay specifically advertises that there is no support for negentropy
throw NegentropySyncError.notSupported
}
}
let timeout = timeout ?? .seconds(5)
let frameSizeLimit = 60_000 // Copied from rust-nostr project: Default frame limit is 128k. Halve that (hex encoding) and subtract a bit (JSON msg overhead)
try? negentropyVector.seal() // Error handling note: We do not care if it throws an `alreadySealed` error. As long as it is sealed in the end it is fine
let negentropyClient = try Negentropy(storage: negentropyVector, frameSizeLimit: frameSizeLimit)
let initialMessage = try negentropyClient.initiate()
let subscriptionId = UUID().uuidString
var allNeedIds: [Id] = []
for await response in negentropyStream(subscriptionId: subscriptionId, filter: filter, initialMessage: initialMessage, timeoutDuration: timeout) {
switch response {
case .error(subscriptionId: _, reasonCodeString: let reasonCodeString):
throw NegentropySyncError.genericError(reasonCodeString)
case .message(subscriptionId: _, data: let data):
var haveIds: [Id] = []
var needIds: [Id] = []
let nextMessage = try negentropyClient.reconcile(data, haveIds: &haveIds, needIds: &needIds)
allNeedIds.append(contentsOf: needIds)
if let nextMessage {
self.send(.typical(.negentropyMessage(subscriptionId: subscriptionId, message: nextMessage)))
}
else {
// Reconciliation is complete
return allNeedIds
}
case .invalidResponse(subscriptionId: _):
throw NegentropySyncError.relayError
}
}
// If the stream completes without a response, throw a timeout/relay error
throw NegentropySyncError.relayError
}
enum NegentropySyncError: Error {
/// Fallback generic error
case genericError(String)
/// Negentropy is not supported by the relay
case notSupported
/// Something went wrong with the relay communication during negentropy sync
case relayError
}
@RelayPoolActor
private func negentropyStream(subscriptionId: String, filter: NostrFilter, initialMessage: [UInt8], timeoutDuration: Duration? = nil) -> AsyncStream<NegentropyResponse> {
return AsyncStream<NegentropyResponse> { continuation in
self.negentropyStreams[subscriptionId] = continuation
let nostrRequest: NostrRequest = .negentropyOpen(subscriptionId: subscriptionId, filter: filter, initialMessage: initialMessage)
self.send(.typical(nostrRequest))
let timeoutTask = Task {
if let timeoutDuration {
try Task.checkCancellation()
try await Task.sleep(for: timeoutDuration)
try Task.checkCancellation()
continuation.finish()
}
}
continuation.onTermination = { @Sendable _ in
Task {
await self.removeNegentropyStream(id: subscriptionId)
self.send(.typical(.negentropyClose(subscriptionId: subscriptionId)))
}
timeoutTask.cancel()
}
}
}
@RelayPoolActor
private func removeNegentropyStream(id: String) {
self.negentropyStreams[id] = nil
}
}
func make_nostr_req(_ req: NostrRequest) -> String? {
@@ -250,6 +379,12 @@ func make_nostr_req(_ req: NostrRequest) -> String? {
return make_nostr_push_event(ev: ev)
case .auth(let ev):
return make_nostr_auth_event(ev: ev)
case .negentropyOpen(subscriptionId: let subscriptionId, filter: let filter, initialMessage: let initialMessage):
return make_nostr_negentropy_open_req(subscriptionId: subscriptionId, filter: filter, initialMessage: initialMessage)
case .negentropyMessage(subscriptionId: let subscriptionId, message: let message):
return make_nostr_negentropy_message_req(subscriptionId: subscriptionId, message: message)
case .negentropyClose(subscriptionId: let subscriptionId):
return make_nostr_negentropy_close_req(subscriptionId: subscriptionId)
}
}
@@ -289,3 +424,28 @@ func make_nostr_subscription_req(_ filters: [NostrFilter], sub_id: String) -> St
req += "]"
return req
}
func make_nostr_negentropy_open_req(subscriptionId: String, filter: NostrFilter, initialMessage: [UInt8]) -> String? {
let encoder = JSONEncoder()
let messageData = Data(initialMessage)
let messageHex = hex_encode(messageData)
var req = "[\"NEG-OPEN\",\"\(subscriptionId)\","
guard let filter_json = try? encoder.encode(filter) else {
return nil
}
let filter_json_str = String(decoding: filter_json, as: UTF8.self)
req += filter_json_str
req += ",\"\(messageHex)\""
req += "]"
return req
}
func make_nostr_negentropy_message_req(subscriptionId: String, message: [UInt8]) -> String? {
let messageData = Data(message)
let messageHex = hex_encode(messageData)
return "[\"NEG-MSG\",\"\(subscriptionId)\",\"\(messageHex)\"]"
}
func make_nostr_negentropy_close_req(subscriptionId: String) -> String? {
return "[\"NEG-CLOSE\",\"\(subscriptionId)\"]"
}

View File

@@ -7,6 +7,7 @@
import Foundation
import Network
import Negentropy
struct RelayHandler {
let sub_id: String
@@ -269,6 +270,12 @@ class RelayPool {
return true
case .auth(_):
return true
case .negentropyOpen(subscriptionId: _, filter: _, initialMessage: _):
return false // Do not persist negentropy requests across sessions
case .negentropyMessage(subscriptionId: _, message: _):
return false // Do not persist negentropy requests across sessions
case .negentropyClose(subscriptionId: _):
return false // Do not persist negentropy requests across sessions
}
}
}
@@ -339,6 +346,8 @@ class RelayPool {
}
case .ok(_): break // No need to handle this, we are not sending an event to the relay
case .auth(_): break // Handled in a separate function in RelayPool
case .negentropyError(subscriptionId: _, reasonCodeString: _): break // Not handled in regular subscriptions
case .negentropyMessage(subscriptionId: _, hexEncodedData: _): break // Not handled in regular subscriptions
}
}
}
@@ -366,6 +375,21 @@ class RelayPool {
}
}
/// This streams events that are pre-existing on the relay, and stops streaming as soon as it receives the EOSE signal.
func subscribeExistingItems(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, eoseTimeout: Duration? = nil, id: UUID? = nil) -> AsyncStream<NostrEvent> {
return AsyncStream<NostrEvent>.with(task: { continuation in
outerLoop: for await item in await self.subscribe(filters: filters, to: desiredRelays, eoseTimeout: eoseTimeout, id: id) {
if Task.isCancelled { return }
switch item {
case .event(let event):
continuation.yield(event)
case .eose:
break outerLoop
}
}
})
}
enum StreamItem {
/// A Nostr event
case event(NostrEvent)
@@ -551,6 +575,115 @@ class RelayPool {
handler.handler.yield((relay_id, event))
}
}
// MARK: - Negentropy
/// This streams items in the following fashion:
/// 1. Performs a negentropy sync, sending missing notes to the stream
/// 2. Send EOSE to signal end of syncing
/// 3. Stream new notes
func negentropySubscribe(
filters: [NostrFilter],
to desiredRelayURLs: [RelayURL]? = nil,
negentropyVector: NegentropyStorageVector,
eoseTimeout: Duration? = nil,
id: UUID? = nil,
ignoreUnsupportedRelays: Bool
) async throws -> AsyncThrowingStream<StreamItem, any Error> {
return AsyncThrowingStream<StreamItem, any Error>.with(task: { continuation in
// 1. Mark the time when we begin negentropy syncing
let negentropyStartTimestamp = UInt32(Date().timeIntervalSince1970)
// 2. Negentropy sync missing notes and send the missing notes over
for try await event in try await self.negentropySync(filters: filters, to: desiredRelayURLs, negentropyVector: negentropyVector, ignoreUnsupportedRelays: ignoreUnsupportedRelays) {
continuation.yield(.event(event))
}
// 3. When syncing is done, send the EOSE signal
continuation.yield(.eose)
// 3. Stream new notes that match the filter
let updatedFilters = filters.map({ filter in
var newFilter = filter
newFilter.since = negentropyStartTimestamp
return newFilter
})
for await item in await self.subscribe(filters: updatedFilters, to: desiredRelayURLs, eoseTimeout: eoseTimeout, id: id) {
try Task.checkCancellation()
switch item {
case .event(let nostrEvent):
continuation.yield(.event(nostrEvent))
case .eose:
continue // We already sent the EOSE signal after negentropy sync, ignore this redundant EOSE
}
}
})
}
/// This performs a negentropy syncing with various relays and various filters and sends missing notes over an async stream
func negentropySync(
filters: [NostrFilter],
to desiredRelayURLs: [RelayURL]? = nil,
negentropyVector: NegentropyStorageVector,
eoseTimeout: Duration? = nil,
ignoreUnsupportedRelays: Bool
) async throws -> AsyncThrowingStream<NostrEvent, any Error> {
return AsyncThrowingStream<NostrEvent, any Error>.with(task: { continuation in
for filter in filters {
try Task.checkCancellation()
for try await event in try await self.negentropySync(filter: filter, to: desiredRelayURLs, negentropyVector: negentropyVector, eoseTimeout: eoseTimeout, ignoreUnsupportedRelays: ignoreUnsupportedRelays) {
try Task.checkCancellation()
continuation.yield(event)
// Note: Negentropy vector already updated by the underlying stream, since it is a reference type
try Task.checkCancellation()
}
}
})
}
/// This performs a negentropy syncing with various relays and sends missing notes over an async stream
func negentropySync(
filter: NostrFilter,
to desiredRelayURLs: [RelayURL]? = nil,
negentropyVector: NegentropyStorageVector,
eoseTimeout: Duration? = nil,
ignoreUnsupportedRelays: Bool
) async throws -> AsyncThrowingStream<NostrEvent, any Error> {
return AsyncThrowingStream<NostrEvent, any Error>.with(task: { continuation in
let desiredRelays = await self.getRelays(targetRelays: desiredRelayURLs)
for desiredRelay in desiredRelays {
try Task.checkCancellation()
do {
for try await event in try await self.negentropySync(filter: filter, to: desiredRelay, negentropyVector: negentropyVector, eoseTimeout: eoseTimeout) {
try Task.checkCancellation()
continuation.yield(event)
// Add to our negentropy vector so that we don't need to receive it from the next relay!
negentropyVector.unseal()
try negentropyVector.insert(nostrEvent: event)
try Task.checkCancellation()
}
}
catch {
if let negentropyError = error as? RelayConnection.NegentropySyncError,
case .notSupported = negentropyError,
ignoreUnsupportedRelays {
// Do not throw error, ignore the relays that do not support negentropy
}
else {
throw error
}
}
}
})
}
/// This performs a negentropy syncing with one relay and sends missing notes over an async stream
func negentropySync(filter: NostrFilter, to desiredRelay: Relay, negentropyVector: NegentropyStorageVector, eoseTimeout: Duration? = nil) async throws -> AsyncThrowingStream<NostrEvent, any Error> {
return AsyncThrowingStream<NostrEvent, any Error>.with(task: { streamContinuation in
let missingIds = try await desiredRelay.connection.getMissingIds(filter: filter, negentropyVector: negentropyVector, timeout: eoseTimeout)
let missingIdsFilter = NostrFilter(ids: missingIds.map { NoteId($0.toData()) })
for await event in self.subscribeExistingItems(filters: [missingIdsFilter], to: [desiredRelay.descriptor.url], eoseTimeout: eoseTimeout) {
streamContinuation.yield(event)
}
})
}
}
func add_rw_relay(_ pool: RelayPool, _ url: RelayURL) async {