Improve NostrNetworkManager interfaces

This commit improves NostrNetworkManager interfaces to be easier to use,
and with more options on how to read data from the Nostr network

This reduces the amount of duplicate logic in handling streams, and also
prevents possible common mistakes when using the standard subscribe method.

This fixes an issue with the mute list manager (which prompted for this
interface improvement, as the root cause is similar to other similar
issues).

Closes: https://github.com/damus-io/damus/issues/3221
Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-09-10 13:52:39 -07:00
parent 2bea2faf3f
commit 3290e1f9d2
21 changed files with 312 additions and 296 deletions

View File

@@ -89,95 +89,6 @@ class NostrNetworkManager {
self.pool.send(.event(event), to: targetRelays, skip_ephemeral: skipEphemeralRelays)
}
func query(filters: [NostrFilter], to: [RelayURL]? = nil) async -> [NostrEvent] {
var events: [NostrEvent] = []
for await item in self.reader.subscribe(filters: filters, to: to) {
switch item {
case .event(let borrow):
try? borrow { event in
events.append(event.toOwned())
}
case .eose:
break
}
}
return events
}
/// Finds a replaceable event based on an `naddr` address.
///
/// - Parameters:
/// - naddr: the `naddr` address
func lookup(naddr: NAddr) async -> NostrEvent? {
var nostrKinds: [NostrKind]? = NostrKind(rawValue: naddr.kind).map { [$0] }
let filter = NostrFilter(kinds: nostrKinds, authors: [naddr.author])
for await item in self.reader.subscribe(filters: [filter]) {
switch item {
case .event(let borrow):
var event: NostrEvent? = nil
try? borrow { ev in
event = ev.toOwned()
}
if event?.referenced_params.first?.param.string() == naddr.identifier {
return event
}
case .eose:
break
}
}
return nil
}
// TODO: Improve this. This is mostly intact to keep compatibility with its predecessor, but we can do better
func findEvent(query: FindEvent) async -> FoundEvent? {
var filter: NostrFilter? = nil
let find_from = query.find_from
let query = query.type
switch query {
case .profile(let pubkey):
if let profile_txn = delegate.ndb.lookup_profile(pubkey),
let record = profile_txn.unsafeUnownedValue,
record.profile != nil
{
return .profile(pubkey)
}
filter = NostrFilter(kinds: [.metadata], limit: 1, authors: [pubkey])
case .event(let evid):
if let event = delegate.ndb.lookup_note(evid)?.unsafeUnownedValue?.to_owned() {
return .event(event)
}
filter = NostrFilter(ids: [evid], limit: 1)
}
var attempts: Int = 0
var has_event = false
guard let filter else { return nil }
for await item in self.reader.subscribe(filters: [filter], to: find_from) {
switch item {
case .event(let borrow):
var result: FoundEvent? = nil
try? borrow { event in
switch query {
case .profile:
if event.known_kind == .metadata {
result = .profile(event.pubkey)
}
case .event:
result = .event(event.toOwned())
}
}
return result
case .eose:
return nil
}
}
return nil
}
func getRelay(_ id: RelayURL) -> RelayPool.Relay? {
pool.get_relay(id)
}

View File

@@ -23,7 +23,29 @@ extension NostrNetworkManager {
self.taskManager = TaskManager()
}
// MARK: - Reading data from Nostr
// MARK: - Subscribing and Streaming data from Nostr
/// Streams notes until the EOSE signal
func streamNotesUntilEndOfStoredEvents(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil) -> AsyncStream<NdbNoteLender> {
let timeout = timeout ?? .seconds(10)
return AsyncStream<NdbNoteLender> { continuation in
let streamingTask = Task {
outerLoop: for await item in self.subscribe(filters: filters, to: desiredRelays, timeout: timeout) {
try Task.checkCancellation()
switch item {
case .event(let lender):
continuation.yield(lender)
case .eose:
break outerLoop
}
}
continuation.finish()
}
continuation.onTermination = { @Sendable _ in
streamingTask.cancel()
}
}
}
/// Subscribes to data from user's relays, for a maximum period of time after which the stream will end.
///
@@ -113,17 +135,9 @@ extension NostrNetworkManager {
case .eose:
continuation.yield(.eose)
case .event(let noteKey):
let lender: NdbNoteLender = { lend in
guard let ndbNoteTxn = self.ndb.lookup_note_by_key(noteKey) else {
throw NdbNoteLenderError.errorLoadingNote
}
guard let unownedNote = UnownedNdbNote(ndbNoteTxn) else {
throw NdbNoteLenderError.errorLoadingNote
}
lend(unownedNote)
}
let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey)
try Task.checkCancellation()
continuation.yield(.event(borrow: lender))
continuation.yield(.event(lender: lender))
}
}
}
@@ -166,6 +180,106 @@ extension NostrNetworkManager {
}
}
// MARK: - Finding specific data from Nostr
/// Finds a non-replaceable event based on a note ID
func lookup(noteId: NoteId, to targetRelays: [RelayURL]? = nil, timeout: Duration? = nil) async throws -> NdbNoteLender? {
let filter = NostrFilter(ids: [noteId], limit: 1)
// Since note ids point to immutable objects, we can do a simple ndb lookup first
if let noteKey = self.ndb.lookup_note_key(noteId) {
return NdbNoteLender(ndb: self.ndb, noteKey: noteKey)
}
// Not available in local ndb, stream from network
outerLoop: for await item in self.pool.subscribe(filters: [NostrFilter(ids: [noteId], limit: 1)], to: targetRelays, eoseTimeout: timeout) {
switch item {
case .event(let event):
return NdbNoteLender(ownedNdbNote: event)
case .eose:
break outerLoop
}
}
return nil
}
func query(filters: [NostrFilter], to: [RelayURL]? = nil, timeout: Duration? = nil) async -> [NostrEvent] {
var events: [NostrEvent] = []
for await noteLender in self.streamNotesUntilEndOfStoredEvents(filters: filters, to: to, timeout: timeout) {
noteLender.justUseACopy({ events.append($0) })
}
return events
}
/// Finds a replaceable event based on an `naddr` address.
///
/// - Parameters:
/// - naddr: the `naddr` address
func lookup(naddr: NAddr, to targetRelays: [RelayURL]? = nil, timeout: Duration? = nil) async -> NostrEvent? {
var nostrKinds: [NostrKind]? = NostrKind(rawValue: naddr.kind).map { [$0] }
let filter = NostrFilter(kinds: nostrKinds, authors: [naddr.author])
for await noteLender in self.streamNotesUntilEndOfStoredEvents(filters: [filter], to: targetRelays, timeout: timeout) {
// TODO: This can be refactored to borrow the note instead of copying it. But we need to implement `referenced_params` on `UnownedNdbNote` to do so
guard let event = noteLender.justGetACopy() else { continue }
if event.referenced_params.first?.param.string() == naddr.identifier {
return event
}
}
return nil
}
// TODO: Improve this. This is mostly intact to keep compatibility with its predecessor, but we can do better
func findEvent(query: FindEvent) async -> FoundEvent? {
var filter: NostrFilter? = nil
let find_from = query.find_from
let query = query.type
switch query {
case .profile(let pubkey):
if let profile_txn = self.ndb.lookup_profile(pubkey),
let record = profile_txn.unsafeUnownedValue,
record.profile != nil
{
return .profile(pubkey)
}
filter = NostrFilter(kinds: [.metadata], limit: 1, authors: [pubkey])
case .event(let evid):
if let event = self.ndb.lookup_note(evid)?.unsafeUnownedValue?.to_owned() {
return .event(event)
}
filter = NostrFilter(ids: [evid], limit: 1)
}
var attempts: Int = 0
var has_event = false
guard let filter else { return nil }
for await noteLender in self.streamNotesUntilEndOfStoredEvents(filters: [filter], to: find_from) {
let foundEvent: FoundEvent? = try? noteLender.borrow({ event in
switch query {
case .profile:
if event.known_kind == .metadata {
return .profile(event.pubkey)
}
case .event:
return .event(event.toOwned())
}
return nil
})
if let foundEvent {
return foundEvent
}
}
return nil
}
// MARK: - Task management
func cancelAllTasks() async {
await self.taskManager.cancelAllTasks()
}
@@ -199,7 +313,7 @@ extension NostrNetworkManager {
enum StreamItem {
/// An event which can be borrowed from NostrDB
case event(borrow: NdbNoteLender)
case event(lender: NdbNoteLender)
/// The end of stored events
case eose
}

View File

@@ -135,15 +135,15 @@ extension NostrNetworkManager {
let filter = NostrFilter(kinds: [.relay_list], authors: [delegate.keypair.pubkey])
for await item in self.reader.subscribe(filters: [filter]) {
switch item {
case .event(borrow: let borrow): // Signature validity already ensured at this point
case .event(let lender): // Signature validity already ensured at this point
let currentRelayListCreationDate = self.getUserCurrentRelayListCreationDate()
try? borrow { note in
try? lender.borrow({ note in
guard note.pubkey == self.delegate.keypair.pubkey else { return } // Ensure this new list was ours
guard note.createdAt > (currentRelayListCreationDate ?? 0) else { return } // Ensure this is a newer list
guard let relayList = try? NIP65.RelayList(event: note) else { return } // Ensure it is a valid NIP-65 list
try? self.set(userRelayList: relayList) // Set the validated list
}
})
case .eose: continue
}
}

View File

@@ -219,9 +219,10 @@ class RelayPool {
/// - Parameters:
/// - filters: The filters specifying the desired content.
/// - desiredRelays: The desired relays which to subsctibe to. If `nil`, it defaults to the `RelayPool`'s default list
/// - eoseTimeout: The maximum timeout which to give up waiting for the eoseSignal, in seconds
/// - eoseTimeout: The maximum timeout which to give up waiting for the eoseSignal
/// - Returns: Returns an async stream that callers can easily consume via a for-loop
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, eoseTimeout: TimeInterval = 10) -> AsyncStream<StreamItem> {
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, eoseTimeout: Duration? = nil) -> AsyncStream<StreamItem> {
let eoseTimeout = eoseTimeout ?? .seconds(10)
let desiredRelays = desiredRelays ?? self.relays.map({ $0.descriptor.url })
return AsyncStream<StreamItem> { continuation in
let sub_id = UUID().uuidString
@@ -255,7 +256,7 @@ class RelayPool {
}
}, to: desiredRelays)
Task {
try? await Task.sleep(nanoseconds: 1_000_000_000 * UInt64(eoseTimeout))
try? await Task.sleep(for: eoseTimeout)
if !eoseSent { continuation.yield(with: .success(.eose)) }
}
continuation.onTermination = { @Sendable _ in