New async streaming interface from RelayPool

This defines a higher level and easier to use streaming interface from
RelayPool.

Changelog-None
Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-03-24 22:05:07 -03:00
parent ffc75772f9
commit 130bbfafb4

View File

@@ -202,6 +202,64 @@ class RelayPool {
register_handler(sub_id: sub_id, handler: handler)
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to)
}
/// Subscribes to data from the `RelayPool` based on a filter and a list of desired relays.
///
/// - Parameters:
/// - filters: The filters specifying the desired content.
/// - desiredRelays: The desired relays which to subsctibe to. If `nil`, it defaults to the `RelayPool`'s default list
/// - eoseTimeout: The maximum timeout which to give up waiting for the eoseSignal, in seconds
/// - Returns: Returns an async stream that callers can easily consume via a for-loop
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, eoseTimeout: TimeInterval = 10) -> AsyncStream<StreamItem> {
let desiredRelays = desiredRelays ?? self.relays.map({ $0.descriptor.url })
return AsyncStream<StreamItem> { continuation in
let sub_id = UUID().uuidString
var seenEvents: Set<NoteId> = []
var relaysWhoFinishedInitialResults: Set<RelayURL> = []
var eoseSent = false
self.subscribe(sub_id: sub_id, filters: filters, handler: { (relayUrl, connectionEvent) in
switch connectionEvent {
case .ws_event(let ev):
// Websocket events such as connect/disconnect/error are already handled in `RelayConnection`. Do not perform any handling here.
// For the future, perhaps we should abstract away `.ws_event` in `RelayPool`? Seems like something to be handled on the `RelayConnection` layer.
break
case .nostr_event(let nostrResponse):
guard nostrResponse.subid == sub_id else { return } // Do not stream items that do not belong in this subscription
switch nostrResponse {
case .event(_, let nostrEvent):
if seenEvents.contains(nostrEvent.id) { break } // Don't send two of the same events.
continuation.yield(with: .success(.event(nostrEvent)))
seenEvents.insert(nostrEvent.id)
case .notice(let note):
break // We do not support handling these yet
case .eose(_):
relaysWhoFinishedInitialResults.insert(relayUrl)
if relaysWhoFinishedInitialResults == Set(desiredRelays) {
continuation.yield(with: .success(.eose))
eoseSent = true
}
case .ok(_): break // No need to handle this, we are not sending an event to the relay
case .auth(_): break // Handled in a separate function in RelayPool
}
}
}, to: desiredRelays)
Task {
try? await Task.sleep(nanoseconds: 1_000_000_000 * UInt64(eoseTimeout))
if !eoseSent { continuation.yield(with: .success(.eose)) }
}
continuation.onTermination = { @Sendable _ in
self.unsubscribe(sub_id: sub_id, to: desiredRelays)
self.remove_handler(sub_id: sub_id)
}
}
}
enum StreamItem {
/// A Nostr event
case event(NostrEvent)
/// The "end of stored events" signal
case eose
}
func subscribe_to(sub_id: String, filters: [NostrFilter], to: [RelayURL]?, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) {
register_handler(sub_id: sub_id, handler: handler)