Improve streaming interfaces and profile loading logic

Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-09-19 11:39:07 -07:00
parent a3ef36120e
commit a09e22df24
45 changed files with 381 additions and 353 deletions

View File

@@ -33,6 +33,7 @@ class NostrNetworkManager {
let postbox: PostBox
/// Handles subscriptions and functions to read or consume data from the Nostr network
let reader: SubscriptionManager
let profilesManager: ProfilesManager
init(delegate: Delegate) {
self.delegate = delegate
@@ -43,6 +44,7 @@ class NostrNetworkManager {
self.reader = reader
self.userRelayList = userRelayList
self.postbox = PostBox(pool: pool)
self.profilesManager = ProfilesManager(subscriptionManager: reader, ndb: delegate.ndb)
}
// MARK: - Control functions
@@ -51,6 +53,7 @@ class NostrNetworkManager {
func connect() {
self.userRelayList.connect()
self.pool.open = true
Task { await self.profilesManager.load() }
}
func disconnect() {

View File

@@ -0,0 +1,137 @@
//
// ProfilesManager.swift
// damus
//
// Created by Daniel DAquino on 2025-09-19.
//
import Foundation
extension NostrNetworkManager {
/// Efficiently manages getting profile metadata from the network and NostrDB without too many relay subscriptions
///
/// This is necessary because relays have a limit on how many subscriptions can be sent to relays at one given time.
actor ProfilesManager {
private var profileListenerTask: Task<Void, any Error>? = nil
private var subscriptionSwitcherTask: Task<Void, any Error>? = nil
private var subscriptionNeedsUpdate: Bool = false
private let subscriptionManager: SubscriptionManager
private let ndb: Ndb
private var streams: [Pubkey: [UUID: ProfileStreamInfo]]
// MARK: - Initialization and deinitialization
init(subscriptionManager: SubscriptionManager, ndb: Ndb) {
self.subscriptionManager = subscriptionManager
self.ndb = ndb
self.streams = [:]
}
deinit {
self.subscriptionSwitcherTask?.cancel()
self.profileListenerTask?.cancel()
}
// MARK: - Task management
func load() {
self.restartProfileListenerTask()
self.subscriptionSwitcherTask?.cancel()
self.subscriptionSwitcherTask = Task {
while true {
try await Task.sleep(for: .seconds(1))
try Task.checkCancellation()
if subscriptionNeedsUpdate {
self.restartProfileListenerTask()
subscriptionNeedsUpdate = false
}
}
}
}
func stop() {
self.subscriptionSwitcherTask?.cancel()
self.profileListenerTask?.cancel()
}
private func restartProfileListenerTask() {
self.profileListenerTask?.cancel()
self.profileListenerTask = Task {
try await self.listenToProfileChanges()
}
}
// MARK: - Listening and publishing of profile changes
private func listenToProfileChanges() async throws {
let pubkeys = Array(streams.keys)
guard pubkeys.count > 0 else { return }
let profileFilter = NostrFilter(kinds: [.metadata], authors: pubkeys)
for await ndbLender in self.subscriptionManager.streamIndefinitely(filters: [profileFilter], streamMode: .ndbFirst) {
try Task.checkCancellation()
try? ndbLender.borrow { ev in
publishProfileUpdates(metadataEvent: ev)
}
try Task.checkCancellation()
}
}
private func publishProfileUpdates(metadataEvent: borrowing UnownedNdbNote) {
let now = UInt64(Date.now.timeIntervalSince1970)
ndb.write_profile_last_fetched(pubkey: metadataEvent.pubkey, fetched_at: now)
if let relevantStreams = streams[metadataEvent.pubkey] {
// If we have the user metadata event in ndb, then we should have the profile record as well.
guard let profile = ndb.lookup_profile(metadataEvent.pubkey) else { return }
for relevantStream in relevantStreams.values {
relevantStream.continuation.yield(profile)
}
}
}
// MARK: - Streaming interface
func streamProfile(pubkey: Pubkey) -> AsyncStream<ProfileStreamItem> {
return AsyncStream<ProfileStreamItem> { continuation in
let stream = ProfileStreamInfo(continuation: continuation)
self.add(pubkey: pubkey, stream: stream)
continuation.onTermination = { @Sendable _ in
Task { await self.removeStream(pubkey: pubkey, id: stream.id) }
}
}
}
// MARK: - Stream management
private func add(pubkey: Pubkey, stream: ProfileStreamInfo) {
if self.streams[pubkey] == nil {
self.streams[pubkey] = [:]
self.subscriptionNeedsUpdate = true
}
self.streams[pubkey]?[stream.id] = stream
}
func removeStream(pubkey: Pubkey, id: UUID) {
self.streams[pubkey]?[id] = nil
if self.streams[pubkey]?.keys.count == 0 {
// We don't need to subscribe to this profile anymore
self.streams[pubkey] = nil
self.subscriptionNeedsUpdate = true
}
}
// MARK: - Helper types
typealias ProfileStreamItem = NdbTxn<ProfileRecord?>
struct ProfileStreamInfo {
let id: UUID = UUID()
let continuation: AsyncStream<ProfileStreamItem>.Continuation
}
}
}

View File

@@ -30,11 +30,11 @@ extension NostrNetworkManager {
// MARK: - Subscribing and Streaming data from Nostr
/// Streams notes until the EOSE signal
func streamNotesUntilEndOfStoredEvents(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
func streamExistingEvents(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
let timeout = timeout ?? .seconds(10)
return AsyncStream<NdbNoteLender> { continuation in
let streamingTask = Task {
outerLoop: for await item in self.subscribe(filters: filters, to: desiredRelays, timeout: timeout, id: id) {
outerLoop: for await item in self.advancedStream(filters: filters, to: desiredRelays, timeout: timeout, streamMode: streamMode, id: id) {
try Task.checkCancellation()
switch item {
case .event(let lender):
@@ -58,34 +58,55 @@ extension NostrNetworkManager {
/// Subscribes to data from user's relays, for a maximum period of time after which the stream will end.
///
/// This is useful when waiting for some specific data from Nostr, but not indefinitely.
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration, id: UUID? = nil) -> AsyncStream<StreamItem> {
return AsyncStream<StreamItem> { continuation in
func timedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
return AsyncStream<NdbNoteLender> { continuation in
let streamingTask = Task {
for await item in self.subscribe(filters: filters, to: desiredRelays, id: id) {
for await item in self.advancedStream(filters: filters, to: desiredRelays, timeout: timeout, streamMode: streamMode, id: id) {
try Task.checkCancellation()
continuation.yield(item)
switch item {
case .event(lender: let lender):
continuation.yield(lender)
case .eose: break
case .ndbEose: break
case .networkEose: break
}
}
continuation.finish()
}
continuation.onTermination = { @Sendable _ in
streamingTask.cancel()
}
}
}
/// Subscribes to notes indefinitely
///
/// This is useful when simply streaming all events indefinitely
func streamIndefinitely(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
return AsyncStream<NdbNoteLender> { continuation in
let streamingTask = Task {
for await item in self.advancedStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) {
try Task.checkCancellation()
switch item {
case .event(lender: let lender):
continuation.yield(lender)
case .eose:
break
case .ndbEose:
break
case .networkEose:
break
}
}
}
let timeoutTask = Task {
try await Task.sleep(for: timeout)
continuation.finish() // End the stream due to timeout.
}
continuation.onTermination = { @Sendable _ in
timeoutTask.cancel()
streamingTask.cancel()
}
}
}
/// Subscribes to data from the user's relays
///
/// ## Implementation notes
///
/// - When we migrate to the local relay model, we should modify this function to stream directly from NostrDB
///
/// - Parameter filters: The nostr filters to specify what kind of data to subscribe to
/// - Returns: An async stream of nostr data
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
func advancedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
return AsyncStream<StreamItem> { continuation in
let subscriptionId = id ?? UUID()
let startTime = CFAbsoluteTimeGetCurrent()
@@ -104,7 +125,7 @@ extension NostrNetworkManager {
continue
}
Log.info("%s: Streaming.", for: .subscription_manager, subscriptionId.uuidString)
for await item in self.sessionSubscribe(filters: filters, to: desiredRelays, id: id) {
for await item in self.sessionSubscribe(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) {
try Task.checkCancellation()
continuation.yield(item)
}
@@ -117,9 +138,16 @@ extension NostrNetworkManager {
}
Log.info("%s: Terminated.", for: .subscription_manager, subscriptionId.uuidString)
}
let timeoutTask = Task {
if let timeout {
try await Task.sleep(for: timeout)
continuation.finish() // End the stream due to timeout.
}
}
continuation.onTermination = { @Sendable _ in
Log.info("%s: Cancelled.", for: .subscription_manager, subscriptionId.uuidString)
multiSessionStreamingTask.cancel()
timeoutTask.cancel()
}
}
}
@@ -134,8 +162,9 @@ extension NostrNetworkManager {
///
/// - Parameter filters: The nostr filters to specify what kind of data to subscribe to
/// - Returns: An async stream of nostr data
private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
let id = id ?? UUID()
let streamMode = streamMode ?? defaultStreamMode()
return AsyncStream<StreamItem> { continuation in
let startTime = CFAbsoluteTimeGetCurrent()
Log.debug("Session subscription %s: Started", for: .subscription_manager, id.uuidString)
@@ -147,10 +176,10 @@ extension NostrNetworkManager {
let connectedToNetwork = self.pool.network_monitor.currentPath.status == .satisfied
// In normal mode: Issuing EOSE requires EOSE from both NDB and the network, since they are all considered separate relays
// In experimental local relay model mode: Issuing EOSE requires only EOSE from NDB, since that is the only relay that "matters"
let canIssueEOSE = self.experimentalLocalRelayModelSupport ?
(ndbEOSEIssued)
:
(ndbEOSEIssued && (networkEOSEIssued || !connectedToNetwork))
let canIssueEOSE = switch streamMode {
case .ndbFirst: (ndbEOSEIssued)
case .ndbAndNetworkParallel: (ndbEOSEIssued && (networkEOSEIssued || !connectedToNetwork))
}
if canIssueEOSE {
Log.debug("Session subscription %s: Issued EOSE for session. Elapsed: %.2f seconds", for: .subscription_manager, id.uuidString, CFAbsoluteTimeGetCurrent() - startTime)
@@ -197,8 +226,10 @@ extension NostrNetworkManager {
if EXTRA_VERBOSE_LOGGING {
Log.debug("Session subscription %s: Received kind %d event with id %s from the network", for: .subscription_manager, id.uuidString, event.kind, event.id.hex())
}
if !self.experimentalLocalRelayModelSupport {
// In normal mode (non-experimental), we stream from ndb but also directly from the network
switch streamMode {
case .ndbFirst:
break // NO-OP
case .ndbAndNetworkParallel:
continuation.yield(.event(lender: NdbNoteLender(ownedNdbNote: event)))
}
case .eose:
@@ -229,6 +260,12 @@ extension NostrNetworkManager {
}
}
// MARK: - Utility functions
private func defaultStreamMode() -> StreamMode {
self.experimentalLocalRelayModelSupport ? .ndbFirst : .ndbAndNetworkParallel
}
// MARK: - Finding specific data from Nostr
/// Finds a non-replaceable event based on a note ID
@@ -255,7 +292,7 @@ extension NostrNetworkManager {
func query(filters: [NostrFilter], to: [RelayURL]? = nil, timeout: Duration? = nil) async -> [NostrEvent] {
var events: [NostrEvent] = []
for await noteLender in self.streamNotesUntilEndOfStoredEvents(filters: filters, to: to, timeout: timeout) {
for await noteLender in self.streamExistingEvents(filters: filters, to: to, timeout: timeout) {
noteLender.justUseACopy({ events.append($0) })
}
return events
@@ -270,7 +307,7 @@ extension NostrNetworkManager {
let filter = NostrFilter(kinds: nostrKinds, authors: [naddr.author])
for await noteLender in self.streamNotesUntilEndOfStoredEvents(filters: [filter], to: targetRelays, timeout: timeout) {
for await noteLender in self.streamExistingEvents(filters: [filter], to: targetRelays, timeout: timeout) {
// TODO: This can be refactored to borrow the note instead of copying it. But we need to implement `referenced_params` on `UnownedNdbNote` to do so
guard let event = noteLender.justGetACopy() else { continue }
if event.referenced_params.first?.param.string() == naddr.identifier {
@@ -307,7 +344,7 @@ extension NostrNetworkManager {
var has_event = false
guard let filter else { return nil }
for await noteLender in self.streamNotesUntilEndOfStoredEvents(filters: [filter], to: find_from) {
for await noteLender in self.streamExistingEvents(filters: [filter], to: find_from) {
let foundEvent: FoundEvent? = try? noteLender.borrow({ event in
switch query {
case .profile:
@@ -363,7 +400,7 @@ extension NostrNetworkManager {
enum StreamItem {
/// An event which can be borrowed from NostrDB
case event(lender: NdbNoteLender)
/// The canonical "end of stored events". See implementations of `subscribe` to see when this event is fired in relation to other EOSEs
/// The canonical generic "end of stored events", which depends on the stream mode. See `StreamMode` to see when this event is fired in relation to other EOSEs
case eose
/// "End of stored events" from NostrDB.
case ndbEose
@@ -386,4 +423,12 @@ extension NostrNetworkManager {
}
}
}
/// The mode of streaming
enum StreamMode {
/// Returns notes exclusively through NostrDB, treating it as the only channel for information in the pipeline. Generic EOSE is fired when EOSE is received from NostrDB
case ndbFirst
/// Returns notes from both NostrDB and the network, in parallel, treating it with similar importance against the network relays. Generic EOSE is fired when EOSE is received from both the network and NostrDB
case ndbAndNetworkParallel
}
}

View File

@@ -133,21 +133,15 @@ extension NostrNetworkManager {
func listenAndHandleRelayUpdates() async {
let filter = NostrFilter(kinds: [.relay_list], authors: [delegate.keypair.pubkey])
for await item in self.reader.subscribe(filters: [filter]) {
switch item {
case .event(let lender): // Signature validity already ensured at this point
let currentRelayListCreationDate = self.getUserCurrentRelayListCreationDate()
try? lender.borrow({ note in
guard note.pubkey == self.delegate.keypair.pubkey else { return } // Ensure this new list was ours
guard note.createdAt > (currentRelayListCreationDate ?? 0) else { return } // Ensure this is a newer list
guard let relayList = try? NIP65.RelayList(event: note) else { return } // Ensure it is a valid NIP-65 list
try? self.set(userRelayList: relayList) // Set the validated list
})
case .eose: continue
case .ndbEose: continue
case .networkEose: continue
}
for await noteLender in self.reader.streamIndefinitely(filters: [filter]) {
let currentRelayListCreationDate = self.getUserCurrentRelayListCreationDate()
try? noteLender.borrow({ note in
guard note.pubkey == self.delegate.keypair.pubkey else { return } // Ensure this new list was ours
guard note.createdAt > (currentRelayListCreationDate ?? 0) else { return } // Ensure this is a newer list
guard let relayList = try? NIP65.RelayList(event: note) else { return } // Ensure it is a valid NIP-65 list
try? self.set(userRelayList: relayList) // Set the validated list
})
}
}

View File

@@ -0,0 +1,35 @@
//
// ProfileObserver.swift
// damus
//
// Created by Daniel DAquino on 2025-09-19.
//
import Combine
import Foundation
@MainActor
class ProfileObserver: ObservableObject {
private let pubkey: Pubkey
private var observerTask: Task<Void, any Error>? = nil
private let damusState: DamusState
init(pubkey: Pubkey, damusState: DamusState) {
self.pubkey = pubkey
self.damusState = damusState
self.watchProfileChanges()
}
private func watchProfileChanges() {
observerTask?.cancel()
observerTask = Task {
for await _ in await damusState.nostrNetwork.profilesManager.streamProfile(pubkey: self.pubkey) {
try Task.checkCancellation()
DispatchQueue.main.async { self.objectWillChange.send() }
}
}
}
deinit {
observerTask?.cancel()
}
}

View File

@@ -9,7 +9,7 @@ import Foundation
import LinkPresentation
import EmojiPicker
class DamusState: HeadlessDamusState {
class DamusState: HeadlessDamusState, ObservableObject {
let keypair: Keypair
let likes: EventCounter
let boosts: EventCounter