Add ndb subscription tests

Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-08-22 12:57:50 -07:00
parent e113dee95e
commit 940b83f5c4
6 changed files with 219 additions and 17 deletions

View File

@@ -33,23 +33,28 @@ extension NostrNetworkManager {
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream<StreamItem> {
return AsyncStream<StreamItem> { continuation in
let ndbStreamTask = Task {
for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) {
switch item {
case .eose:
continuation.yield(.eose)
case .event(let noteKey):
let lender: NdbNoteLender = { lend in
guard let ndbNoteTxn = self.ndb.lookup_note_by_key(noteKey) else {
throw NdbNoteLenderError.errorLoadingNote
do {
for await item in try self.ndb.subscribe(filters: try filters.map({ try NdbFilter(from: $0) })) {
switch item {
case .eose:
continuation.yield(.eose)
case .event(let noteKey):
let lender: NdbNoteLender = { lend in
guard let ndbNoteTxn = self.ndb.lookup_note_by_key(noteKey) else {
throw NdbNoteLenderError.errorLoadingNote
}
guard let unownedNote = UnownedNdbNote(ndbNoteTxn) else {
throw NdbNoteLenderError.errorLoadingNote
}
lend(unownedNote)
}
guard let unownedNote = UnownedNdbNote(ndbNoteTxn) else {
throw NdbNoteLenderError.errorLoadingNote
}
lend(unownedNote)
continuation.yield(.event(borrow: lender))
}
continuation.yield(.event(borrow: lender))
}
}
catch {
Log.error("NDB streaming error: %s", for: .ndb, error.localizedDescription)
}
}
let streamTask = Task {
for await _ in self.pool.subscribe(filters: filters, to: desiredRelays) {

View File

@@ -561,8 +561,8 @@ class HomeModel: ContactsDelegate {
try? borrow { ev in
event = ev.toOwned()
}
guard let event else { return }
await self.process_event(ev: event, context: .notifications)
guard let theEvent = event else { return }
await self.process_event(ev: theEvent, context: .notifications)
case .eose:
guard let txn = NdbTxn(ndb: damus_state.ndb) else { return }
load_profiles(context: "notifications", load: .from_keys(notifications.uniq_pubkeys()), damus_state: damus_state, txn: txn)