diff --git a/damus.xcodeproj/project.pbxproj b/damus.xcodeproj/project.pbxproj index 8d627516..d1792c39 100644 --- a/damus.xcodeproj/project.pbxproj +++ b/damus.xcodeproj/project.pbxproj @@ -1161,6 +1161,12 @@ D72A2D022AD9C136002AFF62 /* EventViewTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72A2CFF2AD9B66B002AFF62 /* EventViewTests.swift */; }; D72A2D052AD9C1B5002AFF62 /* MockDamusState.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72A2D042AD9C1B5002AFF62 /* MockDamusState.swift */; }; D72A2D072AD9C1FB002AFF62 /* MockProfiles.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72A2D062AD9C1FB002AFF62 /* MockProfiles.swift */; }; + D72B6FA22E7DFB450050CD1D /* ProfilesManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72B6FA12E7DFB3F0050CD1D /* ProfilesManager.swift */; }; + D72B6FA32E7DFB450050CD1D /* ProfilesManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72B6FA12E7DFB3F0050CD1D /* ProfilesManager.swift */; }; + D72B6FA42E7DFB450050CD1D /* ProfilesManager.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72B6FA12E7DFB3F0050CD1D /* ProfilesManager.swift */; }; + D72B6FA62E7E06AD0050CD1D /* ProfileObserver.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72B6FA52E7E06A40050CD1D /* ProfileObserver.swift */; }; + D72B6FA72E7E06AD0050CD1D /* ProfileObserver.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72B6FA52E7E06A40050CD1D /* ProfileObserver.swift */; }; + D72B6FA92E7E06AD0050CD1D /* ProfileObserver.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72B6FA52E7E06A40050CD1D /* ProfileObserver.swift */; }; D72C01312E78C10500AACB67 /* CondensedProfilePicturesViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72C01302E78C0FB00AACB67 /* CondensedProfilePicturesViewModel.swift */; }; D72C01322E78C10500AACB67 /* CondensedProfilePicturesViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72C01302E78C0FB00AACB67 /* CondensedProfilePicturesViewModel.swift */; }; D72C01332E78C10500AACB67 /* CondensedProfilePicturesViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = D72C01302E78C0FB00AACB67 /* CondensedProfilePicturesViewModel.swift */; }; @@ -2614,6 +2620,8 @@ D72A2CFF2AD9B66B002AFF62 /* EventViewTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EventViewTests.swift; sourceTree = ""; }; D72A2D042AD9C1B5002AFF62 /* MockDamusState.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockDamusState.swift; sourceTree = ""; }; D72A2D062AD9C1FB002AFF62 /* MockProfiles.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockProfiles.swift; sourceTree = ""; }; + D72B6FA12E7DFB3F0050CD1D /* ProfilesManager.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ProfilesManager.swift; sourceTree = ""; }; + D72B6FA52E7E06A40050CD1D /* ProfileObserver.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ProfileObserver.swift; sourceTree = ""; }; D72C01302E78C0FB00AACB67 /* CondensedProfilePicturesViewModel.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CondensedProfilePicturesViewModel.swift; sourceTree = ""; }; D72E12772BEED22400F4F781 /* Array.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Array.swift; sourceTree = ""; }; D72E12792BEEEED000F4F781 /* NostrFilterTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NostrFilterTests.swift; sourceTree = ""; }; @@ -3105,6 +3113,7 @@ 4C75EFAB28049CC80006080F /* Nostr */ = { isa = PBXGroup; children = ( + D72B6FA52E7E06A40050CD1D /* ProfileObserver.swift */, 4CE6DF1527F8DEBF00C66700 /* RelayConnection.swift */, 50A60D132A28BEEE00186190 /* RelayLog.swift */, 4C75EFA527FF87A20006080F /* Nostr.swift */, @@ -4907,6 +4916,7 @@ D73BDB122D71212600D69970 /* NostrNetworkManager */ = { isa = PBXGroup; children = ( + D72B6FA12E7DFB3F0050CD1D /* ProfilesManager.swift */, D733F9E02D92C1AA00317B11 /* SubscriptionManager.swift */, D73BDB172D71310C00D69970 /* UserRelayListErrors.swift */, D73BDB132D71215F00D69970 /* UserRelayListManager.swift */, @@ -5712,6 +5722,7 @@ 4C64305C2A945AFF00B0C0E9 /* MusicController.swift in Sources */, 5053ACA72A56DF3B00851AE3 /* DeveloperSettingsView.swift in Sources */, F79C7FAD29D5E9620000F946 /* EditPictureControl.swift in Sources */, + D72B6FA62E7E06AD0050CD1D /* ProfileObserver.swift in Sources */, 4C011B5F2BD0A56A002F2F9B /* ChatroomThreadView.swift in Sources */, 4C9F18E229AA9B6C008C55EC /* CustomizeZapView.swift in Sources */, 4C2859602A12A2BE004746F7 /* SupporterBadge.swift in Sources */, @@ -5764,6 +5775,7 @@ 4C5F9114283D694D0052CD1C /* FollowTarget.swift in Sources */, 5C0567582C8FBC560073F23A /* NDBSearchView.swift in Sources */, D72341192B6864F200E1E135 /* DamusPurpleEnvironment.swift in Sources */, + D72B6FA32E7DFB450050CD1D /* ProfilesManager.swift in Sources */, 4CF0ABD629817F5B00D66079 /* ReportView.swift in Sources */, D71528002E0A3D6900C893D6 /* InterestList.swift in Sources */, 4C1A9A2729DDE31900516EAC /* TranslationSettingsView.swift in Sources */, @@ -6064,6 +6076,7 @@ 82D6FAC12CD99F7900C925F4 /* AsciiCharacter.swift in Sources */, 82D6FAC22CD99F7900C925F4 /* NdbTagElem.swift in Sources */, 82D6FAC32CD99F7900C925F4 /* Ndb.swift in Sources */, + D72B6FA92E7E06AD0050CD1D /* ProfileObserver.swift in Sources */, 82D6FAC42CD99F7900C925F4 /* NdbTagsIterator.swift in Sources */, 82D6FAC52CD99F7900C925F4 /* NdbTxn.swift in Sources */, 82D6FAC72CD99F7900C925F4 /* midl.c in Sources */, @@ -6124,6 +6137,7 @@ 82D6FB052CD99F7900C925F4 /* MusicController.swift in Sources */, 82D6FB062CD99F7900C925F4 /* UserStatusView.swift in Sources */, 82D6FB072CD99F7900C925F4 /* UserStatus.swift in Sources */, + D72B6FA22E7DFB450050CD1D /* ProfilesManager.swift in Sources */, 5CB017262D42C5C400A9ED05 /* TransactionsView.swift in Sources */, 82D6FB082CD99F7900C925F4 /* UserStatusSheet.swift in Sources */, 82D6FB092CD99F7900C925F4 /* SearchHeaderView.swift in Sources */, @@ -6551,6 +6565,7 @@ D73E5E242C6A97F4007EB227 /* FollowedNotify.swift in Sources */, D73E5E252C6A97F4007EB227 /* FollowNotify.swift in Sources */, D73E5E262C6A97F4007EB227 /* LikedNotify.swift in Sources */, + D72B6FA42E7DFB450050CD1D /* ProfilesManager.swift in Sources */, D73E5E272C6A97F4007EB227 /* LocalNotificationNotify.swift in Sources */, D73E5F8B2C6AA6A2007EB227 /* UserStatusSheet.swift in Sources */, D73E5E282C6A97F4007EB227 /* LoginNotify.swift in Sources */, @@ -6664,6 +6679,7 @@ D73E5E8A2C6A97F4007EB227 /* PurpleStoreKitManager.swift in Sources */, D733F9E72D92C76100317B11 /* UnownedNdbNote.swift in Sources */, D73E5E8E2C6A97F4007EB227 /* ImageResizer.swift in Sources */, + D72B6FA72E7E06AD0050CD1D /* ProfileObserver.swift in Sources */, D78F080E2D7F78EF00FC6C75 /* Request.swift in Sources */, D73E5E8F2C6A97F4007EB227 /* PhotoCaptureProcessor.swift in Sources */, D773BC602C6D538500349F0A /* CommentItem.swift in Sources */, diff --git a/damus/ContentView.swift b/damus/ContentView.swift index c024145d..2984943b 100644 --- a/damus/ContentView.swift +++ b/damus/ContentView.swift @@ -819,7 +819,7 @@ struct TopbarSideMenuButton: View { Button { isSideBarOpened.toggle() } label: { - ProfilePicView(pubkey: damus_state.pubkey, size: 32, highlight: .none, profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation) + ProfilePicView(pubkey: damus_state.pubkey, size: 32, highlight: .none, profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation, damusState: damus_state) .opacity(isSideBarOpened ? 0 : 1) .animation(isSideBarOpened ? .none : .default, value: isSideBarOpened) .accessibilityHidden(true) // Knowing there is a profile picture here leads to no actionable outcome to VoiceOver users, so it is best not to show it diff --git a/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift b/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift index 80321f8b..1157314b 100644 --- a/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift @@ -33,6 +33,7 @@ class NostrNetworkManager { let postbox: PostBox /// Handles subscriptions and functions to read or consume data from the Nostr network let reader: SubscriptionManager + let profilesManager: ProfilesManager init(delegate: Delegate) { self.delegate = delegate @@ -43,6 +44,7 @@ class NostrNetworkManager { self.reader = reader self.userRelayList = userRelayList self.postbox = PostBox(pool: pool) + self.profilesManager = ProfilesManager(subscriptionManager: reader, ndb: delegate.ndb) } // MARK: - Control functions @@ -51,6 +53,7 @@ class NostrNetworkManager { func connect() { self.userRelayList.connect() self.pool.open = true + Task { await self.profilesManager.load() } } func disconnect() { diff --git a/damus/Core/Networking/NostrNetworkManager/ProfilesManager.swift b/damus/Core/Networking/NostrNetworkManager/ProfilesManager.swift new file mode 100644 index 00000000..2b0f462d --- /dev/null +++ b/damus/Core/Networking/NostrNetworkManager/ProfilesManager.swift @@ -0,0 +1,137 @@ +// +// ProfilesManager.swift +// damus +// +// Created by Daniel D’Aquino on 2025-09-19. +// +import Foundation + +extension NostrNetworkManager { + /// Efficiently manages getting profile metadata from the network and NostrDB without too many relay subscriptions + /// + /// This is necessary because relays have a limit on how many subscriptions can be sent to relays at one given time. + actor ProfilesManager { + private var profileListenerTask: Task? = nil + private var subscriptionSwitcherTask: Task? = nil + private var subscriptionNeedsUpdate: Bool = false + private let subscriptionManager: SubscriptionManager + private let ndb: Ndb + private var streams: [Pubkey: [UUID: ProfileStreamInfo]] + + + // MARK: - Initialization and deinitialization + + init(subscriptionManager: SubscriptionManager, ndb: Ndb) { + self.subscriptionManager = subscriptionManager + self.ndb = ndb + self.streams = [:] + } + + deinit { + self.subscriptionSwitcherTask?.cancel() + self.profileListenerTask?.cancel() + } + + // MARK: - Task management + + func load() { + self.restartProfileListenerTask() + self.subscriptionSwitcherTask?.cancel() + self.subscriptionSwitcherTask = Task { + while true { + try await Task.sleep(for: .seconds(1)) + try Task.checkCancellation() + if subscriptionNeedsUpdate { + self.restartProfileListenerTask() + subscriptionNeedsUpdate = false + } + } + } + } + + func stop() { + self.subscriptionSwitcherTask?.cancel() + self.profileListenerTask?.cancel() + } + + private func restartProfileListenerTask() { + self.profileListenerTask?.cancel() + self.profileListenerTask = Task { + try await self.listenToProfileChanges() + } + } + + + // MARK: - Listening and publishing of profile changes + + private func listenToProfileChanges() async throws { + let pubkeys = Array(streams.keys) + guard pubkeys.count > 0 else { return } + let profileFilter = NostrFilter(kinds: [.metadata], authors: pubkeys) + for await ndbLender in self.subscriptionManager.streamIndefinitely(filters: [profileFilter], streamMode: .ndbFirst) { + try Task.checkCancellation() + try? ndbLender.borrow { ev in + publishProfileUpdates(metadataEvent: ev) + } + try Task.checkCancellation() + } + } + + private func publishProfileUpdates(metadataEvent: borrowing UnownedNdbNote) { + let now = UInt64(Date.now.timeIntervalSince1970) + ndb.write_profile_last_fetched(pubkey: metadataEvent.pubkey, fetched_at: now) + + if let relevantStreams = streams[metadataEvent.pubkey] { + // If we have the user metadata event in ndb, then we should have the profile record as well. + guard let profile = ndb.lookup_profile(metadataEvent.pubkey) else { return } + for relevantStream in relevantStreams.values { + relevantStream.continuation.yield(profile) + } + } + } + + + // MARK: - Streaming interface + + func streamProfile(pubkey: Pubkey) -> AsyncStream { + return AsyncStream { continuation in + let stream = ProfileStreamInfo(continuation: continuation) + self.add(pubkey: pubkey, stream: stream) + + continuation.onTermination = { @Sendable _ in + Task { await self.removeStream(pubkey: pubkey, id: stream.id) } + } + } + } + + + // MARK: - Stream management + + private func add(pubkey: Pubkey, stream: ProfileStreamInfo) { + if self.streams[pubkey] == nil { + self.streams[pubkey] = [:] + self.subscriptionNeedsUpdate = true + } + self.streams[pubkey]?[stream.id] = stream + } + + func removeStream(pubkey: Pubkey, id: UUID) { + self.streams[pubkey]?[id] = nil + if self.streams[pubkey]?.keys.count == 0 { + // We don't need to subscribe to this profile anymore + self.streams[pubkey] = nil + self.subscriptionNeedsUpdate = true + } + } + + + // MARK: - Helper types + + typealias ProfileStreamItem = NdbTxn + + struct ProfileStreamInfo { + let id: UUID = UUID() + let continuation: AsyncStream.Continuation + } + } +} diff --git a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift index 7ffec8e2..0207f121 100644 --- a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift @@ -30,11 +30,11 @@ extension NostrNetworkManager { // MARK: - Subscribing and Streaming data from Nostr /// Streams notes until the EOSE signal - func streamNotesUntilEndOfStoredEvents(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, id: UUID? = nil) -> AsyncStream { + func streamExistingEvents(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { let timeout = timeout ?? .seconds(10) return AsyncStream { continuation in let streamingTask = Task { - outerLoop: for await item in self.subscribe(filters: filters, to: desiredRelays, timeout: timeout, id: id) { + outerLoop: for await item in self.advancedStream(filters: filters, to: desiredRelays, timeout: timeout, streamMode: streamMode, id: id) { try Task.checkCancellation() switch item { case .event(let lender): @@ -58,34 +58,55 @@ extension NostrNetworkManager { /// Subscribes to data from user's relays, for a maximum period of time — after which the stream will end. /// /// This is useful when waiting for some specific data from Nostr, but not indefinitely. - func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration, id: UUID? = nil) -> AsyncStream { - return AsyncStream { continuation in + func timedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { + return AsyncStream { continuation in let streamingTask = Task { - for await item in self.subscribe(filters: filters, to: desiredRelays, id: id) { + for await item in self.advancedStream(filters: filters, to: desiredRelays, timeout: timeout, streamMode: streamMode, id: id) { try Task.checkCancellation() - continuation.yield(item) + switch item { + case .event(lender: let lender): + continuation.yield(lender) + case .eose: break + case .ndbEose: break + case .networkEose: break + } + } + continuation.finish() + } + continuation.onTermination = { @Sendable _ in + streamingTask.cancel() + } + } + } + + /// Subscribes to notes indefinitely + /// + /// This is useful when simply streaming all events indefinitely + func streamIndefinitely(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { + return AsyncStream { continuation in + let streamingTask = Task { + for await item in self.advancedStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { + try Task.checkCancellation() + switch item { + case .event(lender: let lender): + continuation.yield(lender) + case .eose: + break + case .ndbEose: + break + case .networkEose: + break + } } - } - let timeoutTask = Task { - try await Task.sleep(for: timeout) - continuation.finish() // End the stream due to timeout. } continuation.onTermination = { @Sendable _ in - timeoutTask.cancel() streamingTask.cancel() } } } /// Subscribes to data from the user's relays - /// - /// ## Implementation notes - /// - /// - When we migrate to the local relay model, we should modify this function to stream directly from NostrDB - /// - /// - Parameter filters: The nostr filters to specify what kind of data to subscribe to - /// - Returns: An async stream of nostr data - func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, id: UUID? = nil) -> AsyncStream { + func advancedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { return AsyncStream { continuation in let subscriptionId = id ?? UUID() let startTime = CFAbsoluteTimeGetCurrent() @@ -104,7 +125,7 @@ extension NostrNetworkManager { continue } Log.info("%s: Streaming.", for: .subscription_manager, subscriptionId.uuidString) - for await item in self.sessionSubscribe(filters: filters, to: desiredRelays, id: id) { + for await item in self.sessionSubscribe(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { try Task.checkCancellation() continuation.yield(item) } @@ -117,9 +138,16 @@ extension NostrNetworkManager { } Log.info("%s: Terminated.", for: .subscription_manager, subscriptionId.uuidString) } + let timeoutTask = Task { + if let timeout { + try await Task.sleep(for: timeout) + continuation.finish() // End the stream due to timeout. + } + } continuation.onTermination = { @Sendable _ in Log.info("%s: Cancelled.", for: .subscription_manager, subscriptionId.uuidString) multiSessionStreamingTask.cancel() + timeoutTask.cancel() } } } @@ -134,8 +162,9 @@ extension NostrNetworkManager { /// /// - Parameter filters: The nostr filters to specify what kind of data to subscribe to /// - Returns: An async stream of nostr data - private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, id: UUID? = nil) -> AsyncStream { + private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream { let id = id ?? UUID() + let streamMode = streamMode ?? defaultStreamMode() return AsyncStream { continuation in let startTime = CFAbsoluteTimeGetCurrent() Log.debug("Session subscription %s: Started", for: .subscription_manager, id.uuidString) @@ -147,10 +176,10 @@ extension NostrNetworkManager { let connectedToNetwork = self.pool.network_monitor.currentPath.status == .satisfied // In normal mode: Issuing EOSE requires EOSE from both NDB and the network, since they are all considered separate relays // In experimental local relay model mode: Issuing EOSE requires only EOSE from NDB, since that is the only relay that "matters" - let canIssueEOSE = self.experimentalLocalRelayModelSupport ? - (ndbEOSEIssued) - : - (ndbEOSEIssued && (networkEOSEIssued || !connectedToNetwork)) + let canIssueEOSE = switch streamMode { + case .ndbFirst: (ndbEOSEIssued) + case .ndbAndNetworkParallel: (ndbEOSEIssued && (networkEOSEIssued || !connectedToNetwork)) + } if canIssueEOSE { Log.debug("Session subscription %s: Issued EOSE for session. Elapsed: %.2f seconds", for: .subscription_manager, id.uuidString, CFAbsoluteTimeGetCurrent() - startTime) @@ -197,8 +226,10 @@ extension NostrNetworkManager { if EXTRA_VERBOSE_LOGGING { Log.debug("Session subscription %s: Received kind %d event with id %s from the network", for: .subscription_manager, id.uuidString, event.kind, event.id.hex()) } - if !self.experimentalLocalRelayModelSupport { - // In normal mode (non-experimental), we stream from ndb but also directly from the network + switch streamMode { + case .ndbFirst: + break // NO-OP + case .ndbAndNetworkParallel: continuation.yield(.event(lender: NdbNoteLender(ownedNdbNote: event))) } case .eose: @@ -229,6 +260,12 @@ extension NostrNetworkManager { } } + // MARK: - Utility functions + + private func defaultStreamMode() -> StreamMode { + self.experimentalLocalRelayModelSupport ? .ndbFirst : .ndbAndNetworkParallel + } + // MARK: - Finding specific data from Nostr /// Finds a non-replaceable event based on a note ID @@ -255,7 +292,7 @@ extension NostrNetworkManager { func query(filters: [NostrFilter], to: [RelayURL]? = nil, timeout: Duration? = nil) async -> [NostrEvent] { var events: [NostrEvent] = [] - for await noteLender in self.streamNotesUntilEndOfStoredEvents(filters: filters, to: to, timeout: timeout) { + for await noteLender in self.streamExistingEvents(filters: filters, to: to, timeout: timeout) { noteLender.justUseACopy({ events.append($0) }) } return events @@ -270,7 +307,7 @@ extension NostrNetworkManager { let filter = NostrFilter(kinds: nostrKinds, authors: [naddr.author]) - for await noteLender in self.streamNotesUntilEndOfStoredEvents(filters: [filter], to: targetRelays, timeout: timeout) { + for await noteLender in self.streamExistingEvents(filters: [filter], to: targetRelays, timeout: timeout) { // TODO: This can be refactored to borrow the note instead of copying it. But we need to implement `referenced_params` on `UnownedNdbNote` to do so guard let event = noteLender.justGetACopy() else { continue } if event.referenced_params.first?.param.string() == naddr.identifier { @@ -307,7 +344,7 @@ extension NostrNetworkManager { var has_event = false guard let filter else { return nil } - for await noteLender in self.streamNotesUntilEndOfStoredEvents(filters: [filter], to: find_from) { + for await noteLender in self.streamExistingEvents(filters: [filter], to: find_from) { let foundEvent: FoundEvent? = try? noteLender.borrow({ event in switch query { case .profile: @@ -363,7 +400,7 @@ extension NostrNetworkManager { enum StreamItem { /// An event which can be borrowed from NostrDB case event(lender: NdbNoteLender) - /// The canonical "end of stored events". See implementations of `subscribe` to see when this event is fired in relation to other EOSEs + /// The canonical generic "end of stored events", which depends on the stream mode. See `StreamMode` to see when this event is fired in relation to other EOSEs case eose /// "End of stored events" from NostrDB. case ndbEose @@ -386,4 +423,12 @@ extension NostrNetworkManager { } } } + + /// The mode of streaming + enum StreamMode { + /// Returns notes exclusively through NostrDB, treating it as the only channel for information in the pipeline. Generic EOSE is fired when EOSE is received from NostrDB + case ndbFirst + /// Returns notes from both NostrDB and the network, in parallel, treating it with similar importance against the network relays. Generic EOSE is fired when EOSE is received from both the network and NostrDB + case ndbAndNetworkParallel + } } diff --git a/damus/Core/Networking/NostrNetworkManager/UserRelayListManager.swift b/damus/Core/Networking/NostrNetworkManager/UserRelayListManager.swift index f0ed0da8..cd4392ba 100644 --- a/damus/Core/Networking/NostrNetworkManager/UserRelayListManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/UserRelayListManager.swift @@ -133,21 +133,15 @@ extension NostrNetworkManager { func listenAndHandleRelayUpdates() async { let filter = NostrFilter(kinds: [.relay_list], authors: [delegate.keypair.pubkey]) - for await item in self.reader.subscribe(filters: [filter]) { - switch item { - case .event(let lender): // Signature validity already ensured at this point - let currentRelayListCreationDate = self.getUserCurrentRelayListCreationDate() - try? lender.borrow({ note in - guard note.pubkey == self.delegate.keypair.pubkey else { return } // Ensure this new list was ours - guard note.createdAt > (currentRelayListCreationDate ?? 0) else { return } // Ensure this is a newer list - guard let relayList = try? NIP65.RelayList(event: note) else { return } // Ensure it is a valid NIP-65 list - - try? self.set(userRelayList: relayList) // Set the validated list - }) - case .eose: continue - case .ndbEose: continue - case .networkEose: continue - } + for await noteLender in self.reader.streamIndefinitely(filters: [filter]) { + let currentRelayListCreationDate = self.getUserCurrentRelayListCreationDate() + try? noteLender.borrow({ note in + guard note.pubkey == self.delegate.keypair.pubkey else { return } // Ensure this new list was ours + guard note.createdAt > (currentRelayListCreationDate ?? 0) else { return } // Ensure this is a newer list + guard let relayList = try? NIP65.RelayList(event: note) else { return } // Ensure it is a valid NIP-65 list + + try? self.set(userRelayList: relayList) // Set the validated list + }) } } diff --git a/damus/Core/Nostr/ProfileObserver.swift b/damus/Core/Nostr/ProfileObserver.swift new file mode 100644 index 00000000..a67f3e22 --- /dev/null +++ b/damus/Core/Nostr/ProfileObserver.swift @@ -0,0 +1,35 @@ +// +// ProfileObserver.swift +// damus +// +// Created by Daniel D’Aquino on 2025-09-19. +// +import Combine +import Foundation + +@MainActor +class ProfileObserver: ObservableObject { + private let pubkey: Pubkey + private var observerTask: Task? = nil + private let damusState: DamusState + + init(pubkey: Pubkey, damusState: DamusState) { + self.pubkey = pubkey + self.damusState = damusState + self.watchProfileChanges() + } + + private func watchProfileChanges() { + observerTask?.cancel() + observerTask = Task { + for await _ in await damusState.nostrNetwork.profilesManager.streamProfile(pubkey: self.pubkey) { + try Task.checkCancellation() + DispatchQueue.main.async { self.objectWillChange.send() } + } + } + } + + deinit { + observerTask?.cancel() + } +} diff --git a/damus/Core/Storage/DamusState.swift b/damus/Core/Storage/DamusState.swift index f7170a03..85d7a052 100644 --- a/damus/Core/Storage/DamusState.swift +++ b/damus/Core/Storage/DamusState.swift @@ -9,7 +9,7 @@ import Foundation import LinkPresentation import EmojiPicker -class DamusState: HeadlessDamusState { +class DamusState: HeadlessDamusState, ObservableObject { let keypair: Keypair let likes: EventCounter let boosts: EventCounter diff --git a/damus/Features/Actions/Reposts/Views/Reposted.swift b/damus/Features/Actions/Reposts/Views/Reposted.swift index 3387e36c..465a0548 100644 --- a/damus/Features/Actions/Reposts/Views/Reposted.swift +++ b/damus/Features/Actions/Reposts/Views/Reposted.swift @@ -27,7 +27,7 @@ struct Reposted: View { // Show profile picture of the reposter only if the reposter is not the author of the reposted note. if pubkey != target.pubkey { - ProfilePicView(pubkey: pubkey, size: eventview_pfp_size(.small), highlight: .none, profiles: damus.profiles, disable_animation: damus.settings.disable_animation) + ProfilePicView(pubkey: pubkey, size: eventview_pfp_size(.small), highlight: .none, profiles: damus.profiles, disable_animation: damus.settings.disable_animation, damusState: damus) .onTapGesture { show_profile_action_sheet_if_enabled(damus_state: damus, pubkey: pubkey) } diff --git a/damus/Features/Chat/ChatEventView.swift b/damus/Features/Chat/ChatEventView.swift index 6ce5cd35..530e1b6c 100644 --- a/damus/Features/Chat/ChatEventView.swift +++ b/damus/Features/Chat/ChatEventView.swift @@ -83,7 +83,7 @@ struct ChatEventView: View { var profile_picture_view: some View { VStack { - ProfilePicView(pubkey: event.pubkey, size: 32, highlight: .none, profiles: damus_state.profiles, disable_animation: disable_animation) + ProfilePicView(pubkey: event.pubkey, size: 32, highlight: .none, profiles: damus_state.profiles, disable_animation: disable_animation, damusState: damus_state) .onTapGesture { show_profile_action_sheet_if_enabled(damus_state: damus_state, pubkey: event.pubkey) } diff --git a/damus/Features/Chat/Models/ThreadModel.swift b/damus/Features/Chat/Models/ThreadModel.swift index b414b945..871aa126 100644 --- a/damus/Features/Chat/Models/ThreadModel.swift +++ b/damus/Features/Chat/Models/ThreadModel.swift @@ -115,18 +115,8 @@ class ThreadModel: ObservableObject { self.listener?.cancel() self.listener = Task { Log.info("subscribing to thread %s ", for: .render, original_event.id.hex()) - for await item in damus_state.nostrNetwork.reader.subscribe(filters: base_filters + meta_filters) { - switch item { - case .event(let lender): - lender.justUseACopy({ handle_event(ev: $0) }) - case .eose: - guard let txn = NdbTxn(ndb: damus_state.ndb) else { return } - load_profiles(context: "thread", load: .from_events(Array(event_map.events)), damus_state: damus_state, txn: txn) - case .ndbEose: - break - case .networkEose: - break - } + for await event in damus_state.nostrNetwork.reader.streamIndefinitely(filters: base_filters + meta_filters) { + event.justUseACopy({ handle_event(ev: $0) }) } } } diff --git a/damus/Features/Chat/ReplyQuoteView.swift b/damus/Features/Chat/ReplyQuoteView.swift index d69bd8fd..62b60a96 100644 --- a/damus/Features/Chat/ReplyQuoteView.swift +++ b/damus/Features/Chat/ReplyQuoteView.swift @@ -26,7 +26,7 @@ struct ReplyQuoteView: View { VStack(alignment: .leading) { HStack(alignment: .center) { if can_show_event { - ProfilePicView(pubkey: event.pubkey, size: 14, highlight: .reply, profiles: state.profiles, disable_animation: false) + ProfilePicView(pubkey: event.pubkey, size: 14, highlight: .reply, profiles: state.profiles, disable_animation: false, damusState: state) let blur_images = should_blur_images(settings: state.settings, contacts: state.contacts, ev: event, our_pubkey: state.pubkey) NoteContentView(damus_state: state, event: event, blur_images: blur_images, size: .small, options: options) .font(.callout) diff --git a/damus/Features/DMs/Views/DMChatView.swift b/damus/Features/DMs/Views/DMChatView.swift index 6804162b..39a74936 100644 --- a/damus/Features/DMs/Views/DMChatView.swift +++ b/damus/Features/DMs/Views/DMChatView.swift @@ -63,7 +63,7 @@ struct DMChatView: View, KeyboardReadable { var Header: some View { return NavigationLink(value: Route.ProfileByKey(pubkey: pubkey)) { HStack { - ProfilePicView(pubkey: pubkey, size: 24, highlight: .none, profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation) + ProfilePicView(pubkey: pubkey, size: 24, highlight: .none, profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation, damusState: damus_state) ProfileName(pubkey: pubkey, damus: damus_state) } diff --git a/damus/Features/Events/EventProfile.swift b/damus/Features/Events/EventProfile.swift index 66c281dd..d435e626 100644 --- a/damus/Features/Events/EventProfile.swift +++ b/damus/Features/Events/EventProfile.swift @@ -37,7 +37,7 @@ struct EventProfile: View { var body: some View { HStack(alignment: .center, spacing: 10) { - ProfilePicView(pubkey: pubkey, size: pfp_size, highlight: .none, profiles: damus_state.profiles, disable_animation: disable_animation, show_zappability: true) + ProfilePicView(pubkey: pubkey, size: pfp_size, highlight: .none, profiles: damus_state.profiles, disable_animation: disable_animation, show_zappability: true, damusState: damus_state) .onTapGesture { show_profile_action_sheet_if_enabled(damus_state: damus_state, pubkey: pubkey) } diff --git a/damus/Features/Events/Models/EventsModel.swift b/damus/Features/Events/Models/EventsModel.swift index 9a0fd1ea..fcd94dc8 100644 --- a/damus/Features/Events/Models/EventsModel.swift +++ b/damus/Features/Events/Models/EventsModel.swift @@ -71,7 +71,7 @@ class EventsModel: ObservableObject { loadingTask?.cancel() loadingTask = Task { DispatchQueue.main.async { self.loading = true } - outerLoop: for await item in state.nostrNetwork.reader.subscribe(filters: [get_filter()]) { + outerLoop: for await item in state.nostrNetwork.reader.advancedStream(filters: [get_filter()]) { switch item { case .event(let lender): Task { @@ -91,8 +91,6 @@ class EventsModel: ObservableObject { } } DispatchQueue.main.async { self.loading = false } - guard let txn = NdbTxn(ndb: self.state.ndb) else { return } - load_profiles(context: "events_model", load: .from_events(events.all_events), damus_state: state, txn: txn) } } diff --git a/damus/Features/FollowPack/Models/FollowPackModel.swift b/damus/Features/FollowPack/Models/FollowPackModel.swift index 80a66d8e..a66e669b 100644 --- a/damus/Features/FollowPack/Models/FollowPackModel.swift +++ b/damus/Features/FollowPack/Models/FollowPackModel.swift @@ -43,27 +43,18 @@ class FollowPackModel: ObservableObject { filter.authors = follow_pack_users filter.limit = 500 - for await item in damus_state.nostrNetwork.reader.subscribe(filters: [filter], to: to_relays) { - switch item { - case .event(lender: let lender): - await lender.justUseACopy({ event in - let should_show_event = await should_show_event(state: damus_state, ev: event) - if event.is_textlike && should_show_event && !event.is_reply() - { - if await self.events.insert(event) { - DispatchQueue.main.async { - self.objectWillChange.send() - } + for await event in damus_state.nostrNetwork.reader.streamIndefinitely(filters: [filter], to: to_relays) { + await event.justUseACopy({ event in + let should_show_event = await should_show_event(state: damus_state, ev: event) + if event.is_textlike && should_show_event && !event.is_reply() + { + if await self.events.insert(event) { + DispatchQueue.main.async { + self.objectWillChange.send() } } - }) - case .eose: - continue - case .ndbEose: - continue - case .networkEose: - continue - } + } + }) } } } diff --git a/damus/Features/FollowPack/Views/FollowPackPreview.swift b/damus/Features/FollowPack/Views/FollowPackPreview.swift index 244eea46..c31dd6a3 100644 --- a/damus/Features/FollowPack/Views/FollowPackPreview.swift +++ b/damus/Features/FollowPack/Views/FollowPackPreview.swift @@ -153,7 +153,7 @@ struct FollowPackPreviewBody: View { } HStack(alignment: .center) { - ProfilePicView(pubkey: event.event.pubkey, size: 25, highlight: .none, profiles: state.profiles, disable_animation: state.settings.disable_animation, show_zappability: true) + ProfilePicView(pubkey: event.event.pubkey, size: 25, highlight: .none, profiles: state.profiles, disable_animation: state.settings.disable_animation, show_zappability: true, damusState: state) .onTapGesture { state.nav.push(route: Route.ProfileByKey(pubkey: event.event.pubkey)) } diff --git a/damus/Features/FollowPack/Views/FollowPackView.swift b/damus/Features/FollowPack/Views/FollowPackView.swift index 301dd30d..22854a23 100644 --- a/damus/Features/FollowPack/Views/FollowPackView.swift +++ b/damus/Features/FollowPack/Views/FollowPackView.swift @@ -131,7 +131,7 @@ struct FollowPackView: View { } HStack(alignment: .center) { - ProfilePicView(pubkey: event.event.pubkey, size: 25, highlight: .none, profiles: state.profiles, disable_animation: state.settings.disable_animation, show_zappability: true) + ProfilePicView(pubkey: event.event.pubkey, size: 25, highlight: .none, profiles: state.profiles, disable_animation: state.settings.disable_animation, show_zappability: true, damusState: state) .onTapGesture { state.nav.push(route: Route.ProfileByKey(pubkey: event.event.pubkey)) } diff --git a/damus/Features/Follows/Models/FollowersModel.swift b/damus/Features/Follows/Models/FollowersModel.swift index 082025e0..b1ab15ab 100644 --- a/damus/Features/Follows/Models/FollowersModel.swift +++ b/damus/Features/Follows/Models/FollowersModel.swift @@ -38,18 +38,8 @@ class FollowersModel: ObservableObject { let filters = [filter] self.listener?.cancel() self.listener = Task { - for await item in damus_state.nostrNetwork.reader.subscribe(filters: filters) { - switch item { - case .event(let lender): - lender.justUseACopy({ self.handle_event(ev: $0) }) - case .eose: - guard let txn = NdbTxn(ndb: self.damus_state.ndb) else { return } - load_profiles(txn: txn) - case .ndbEose: - continue - case .networkEose: - continue - } + for await lender in damus_state.nostrNetwork.reader.streamIndefinitely(filters: filters) { + lender.justUseACopy({ self.handle_event(ev: $0) }) } } } @@ -70,31 +60,6 @@ class FollowersModel: ObservableObject { contacts?.append(ev.pubkey) has_contact.insert(ev.pubkey) } - - func load_profiles(txn: NdbTxn) { - let authors = find_profiles_to_fetch_from_keys(profiles: damus_state.profiles, pks: contacts ?? [], txn: txn) - if authors.isEmpty { - return - } - - let filter = NostrFilter(kinds: [.metadata], - authors: authors) - - self.profilesListener?.cancel() - self.profilesListener = Task { - for await item in await damus_state.nostrNetwork.reader.subscribe(filters: [filter]) { - switch item { - case .event(let lender): - lender.justUseACopy({ self.handle_event(ev: $0) }) - case .eose: break - case .ndbEose: - continue - case .networkEose: - continue - } - } - } - } func handle_event(ev: NostrEvent) { if ev.known_kind == .contacts { diff --git a/damus/Features/Follows/Models/FollowingModel.swift b/damus/Features/Follows/Models/FollowingModel.swift index 59a547ac..39e79b3b 100644 --- a/damus/Features/Follows/Models/FollowingModel.swift +++ b/damus/Features/Follows/Models/FollowingModel.swift @@ -43,7 +43,7 @@ class FollowingModel { let filters = [filter] self.listener?.cancel() self.listener = Task { - for await item in self.damus_state.nostrNetwork.reader.subscribe(filters: filters) { + for await item in self.damus_state.nostrNetwork.reader.advancedStream(filters: filters) { // don't need to do anything here really continue } diff --git a/damus/Features/NIP05/Models/NIP05DomainEventsModel.swift b/damus/Features/NIP05/Models/NIP05DomainEventsModel.swift index 137cc787..d5072b36 100644 --- a/damus/Features/NIP05/Models/NIP05DomainEventsModel.swift +++ b/damus/Features/NIP05/Models/NIP05DomainEventsModel.swift @@ -64,13 +64,11 @@ class NIP05DomainEventsModel: ObservableObject { filter.authors = Array(authors) - for await item in state.nostrNetwork.reader.subscribe(filters: [filter]) { + for await item in state.nostrNetwork.reader.advancedStream(filters: [filter]) { switch item { case .event(let lender): await lender.justUseACopy({ await self.add_event($0) }) case .eose: - guard let txn = NdbTxn(ndb: state.ndb) else { return } - load_profiles(context: "search", load: .from_events(self.events.all_events), damus_state: state, txn: txn) DispatchQueue.main.async { self.loading = false } continue case .ndbEose: diff --git a/damus/Features/Notifications/Views/ProfilePicturesView.swift b/damus/Features/Notifications/Views/ProfilePicturesView.swift index 1c274349..ad8c23ee 100644 --- a/damus/Features/Notifications/Views/ProfilePicturesView.swift +++ b/damus/Features/Notifications/Views/ProfilePicturesView.swift @@ -14,7 +14,7 @@ struct ProfilePicturesView: View { var body: some View { HStack { ForEach(pubkeys.prefix(8), id: \.self) { pubkey in - ProfilePicView(pubkey: pubkey, size: 32.0, highlight: .none, profiles: state.profiles, disable_animation: state.settings.disable_animation) + ProfilePicView(pubkey: pubkey, size: 32.0, highlight: .none, profiles: state.profiles, disable_animation: state.settings.disable_animation, damusState: state) .onTapGesture { state.nav.push(route: Route.ProfileByKey(pubkey: pubkey)) } diff --git a/damus/Features/Onboarding/SuggestedUsersViewModel.swift b/damus/Features/Onboarding/SuggestedUsersViewModel.swift index 3945765f..65ab22fd 100644 --- a/damus/Features/Onboarding/SuggestedUsersViewModel.swift +++ b/damus/Features/Onboarding/SuggestedUsersViewModel.swift @@ -189,7 +189,7 @@ class SuggestedUsersViewModel: ObservableObject { authors: [Constants.ONBOARDING_FOLLOW_PACK_CURATOR_PUBKEY] ) - for await lender in self.damus_state.nostrNetwork.reader.streamNotesUntilEndOfStoredEvents(filters: [filter]) { + for await lender in self.damus_state.nostrNetwork.reader.streamExistingEvents(filters: [filter]) { // Check for cancellation on each iteration guard !Task.isCancelled else { break } @@ -212,6 +212,7 @@ class SuggestedUsersViewModel: ObservableObject { } /// Finds all profiles mentioned in the follow packs, and loads the profile data from the network + // TODO LOCAL_RELAY_PROFILE: Remove this private func loadProfiles(for packs: [FollowPackEvent]) async { var allPubkeys: [Pubkey] = [] @@ -223,7 +224,7 @@ class SuggestedUsersViewModel: ObservableObject { } let profileFilter = NostrFilter(kinds: [.metadata], authors: allPubkeys) - for await _ in damus_state.nostrNetwork.reader.streamNotesUntilEndOfStoredEvents(filters: [profileFilter]) { + for await _ in damus_state.nostrNetwork.reader.streamExistingEvents(filters: [profileFilter]) { // NO-OP. We just need NostrDB to ingest these for them to be available elsewhere, no need to analyze the data } } diff --git a/damus/Features/Posting/Views/PostView.swift b/damus/Features/Posting/Views/PostView.swift index cfec7833..1f9672a4 100644 --- a/damus/Features/Posting/Views/PostView.swift +++ b/damus/Features/Posting/Views/PostView.swift @@ -388,7 +388,7 @@ struct PostView: View { HStack(alignment: .top, spacing: 0) { VStack(alignment: .leading, spacing: 0) { HStack(alignment: .top) { - ProfilePicView(pubkey: damus_state.pubkey, size: PFP_SIZE, highlight: .none, profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation) + ProfilePicView(pubkey: damus_state.pubkey, size: PFP_SIZE, highlight: .none, profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation, damusState: damus_state) VStack(alignment: .leading) { if let prompt_view { diff --git a/damus/Features/Profile/Models/CondensedProfilePicturesViewModel.swift b/damus/Features/Profile/Models/CondensedProfilePicturesViewModel.swift index a8c35f07..166f2f1f 100644 --- a/damus/Features/Profile/Models/CondensedProfilePicturesViewModel.swift +++ b/damus/Features/Profile/Models/CondensedProfilePicturesViewModel.swift @@ -21,22 +21,4 @@ class CondensedProfilePicturesViewModel: ObservableObject { self.pubkeys = pubkeys self.maxPictures = min(maxPictures, pubkeys.count) } - - func load() { - loadingTask?.cancel() - loadingTask = Task { try? await loadingTask() } - } - - func loadingTask() async throws { - let filter = NostrFilter(kinds: [.metadata], authors: shownPubkeys) - let _ = await state.nostrNetwork.reader.query(filters: [filter]) - for await _ in state.nostrNetwork.reader.streamNotesUntilEndOfStoredEvents(filters: [filter]) { - // NO-OP, we just need it to be loaded into NostrDB. - try Task.checkCancellation() - } - DispatchQueue.main.async { - // Cause the view to re-render with the newly loaded profiles - self.objectWillChange.send() - } - } } diff --git a/damus/Features/Profile/Models/ProfileModel.swift b/damus/Features/Profile/Models/ProfileModel.swift index eb281c13..dae8ea95 100644 --- a/damus/Features/Profile/Models/ProfileModel.swift +++ b/damus/Features/Profile/Models/ProfileModel.swift @@ -76,34 +76,21 @@ class ProfileModel: ObservableObject, Equatable { var text_filter = NostrFilter(kinds: [.text, .longform, .highlight]) text_filter.authors = [pubkey] text_filter.limit = 500 - for await item in damus.nostrNetwork.reader.subscribe(filters: [text_filter]) { - switch item { - case .event(let lender): - lender.justUseACopy({ handleNostrEvent($0) }) - case .eose: break - case .ndbEose: break - case .networkEose: break - } - } - guard let txn = NdbTxn(ndb: damus.ndb) else { return } - load_profiles(context: "profile", load: .from_events(events.events), damus_state: damus, txn: txn) await bumpUpProgress() + for await event in damus.nostrNetwork.reader.streamIndefinitely(filters: [text_filter]) { + event.justUseACopy({ handleNostrEvent($0) }) + } } profileListener?.cancel() profileListener = Task { var profile_filter = NostrFilter(kinds: [.contacts, .metadata, .boost]) var relay_list_filter = NostrFilter(kinds: [.relay_list], authors: [pubkey]) profile_filter.authors = [pubkey] - for await item in damus.nostrNetwork.reader.subscribe(filters: [profile_filter, relay_list_filter]) { - switch item { - case .event(let lender): - lender.justUseACopy({ handleNostrEvent($0) }) - case .eose: break - case .ndbEose: break - case .networkEose: break - } - } await bumpUpProgress() + for await event in damus.nostrNetwork.reader.streamIndefinitely(filters: [profile_filter, relay_list_filter]) { + event.justUseACopy({ handleNostrEvent($0) }) + } + } conversationListener?.cancel() conversationListener = Task { @@ -127,25 +114,16 @@ class ProfileModel: ObservableObject, Equatable { let conversations_filter_them = NostrFilter(kinds: conversation_kinds, pubkeys: [damus.pubkey], limit: limit, authors: [pubkey]) let conversations_filter_us = NostrFilter(kinds: conversation_kinds, pubkeys: [pubkey], limit: limit, authors: [damus.pubkey]) print("subscribing to conversation events from and to profile \(pubkey)") - for await item in self.damus.nostrNetwork.reader.subscribe(filters: [conversations_filter_them, conversations_filter_us]) { - switch item { - case .event(let lender): - try? lender.borrow { ev in - if !seen_event.contains(ev.id) { - let event = ev.toOwned() - Task { await self.add_event(event) } - conversation_events.insert(ev.id) - } - else if !conversation_events.contains(ev.id) { - conversation_events.insert(ev.id) - } + for await noteLender in self.damus.nostrNetwork.reader.streamIndefinitely(filters: [conversations_filter_them, conversations_filter_us]) { + try? noteLender.borrow { ev in + if !seen_event.contains(ev.id) { + let event = ev.toOwned() + Task { await self.add_event(event) } + conversation_events.insert(ev.id) + } + else if !conversation_events.contains(ev.id) { + conversation_events.insert(ev.id) } - case .eose: - continue - case .ndbEose: - continue - case .networkEose: - continue } } } @@ -212,21 +190,12 @@ class ProfileModel: ObservableObject, Equatable { profile_filter.authors = [pubkey] self.findRelaysListener?.cancel() self.findRelaysListener = Task { - for await item in await damus.nostrNetwork.reader.subscribe(filters: [profile_filter]) { - switch item { - case .event(let lender): - try? lender.borrow { event in - if case .contacts = event.known_kind { - // TODO: Is this correct? - self.legacy_relay_list = decode_json_relays(event.content) - } + for await noteLender in damus.nostrNetwork.reader.streamIndefinitely(filters: [profile_filter]) { + try? noteLender.borrow { event in + if case .contacts = event.known_kind { + // TODO: Is this correct? + self.legacy_relay_list = decode_json_relays(event.content) } - case .eose: - break - case .ndbEose: - break - case .networkEose: - break } } } diff --git a/damus/Features/Profile/Views/CondensedProfilePicturesView.swift b/damus/Features/Profile/Views/CondensedProfilePicturesView.swift index a04666fd..237286fb 100644 --- a/damus/Features/Profile/Views/CondensedProfilePicturesView.swift +++ b/damus/Features/Profile/Views/CondensedProfilePicturesView.swift @@ -18,16 +18,12 @@ struct CondensedProfilePicturesView: View { // Using ZStack to make profile pictures floating and stacked on top of each other. ZStack { ForEach((0..(context: String, load: PubkeysToLoad, damus_state: DamusState, txn: NdbTxn) -> Task? { - let authors = find_profiles_to_fetch(profiles: damus_state.profiles, load: load, cache: damus_state.events, txn: txn) - - guard !authors.isEmpty else { - return nil - } - - return Task { - print("load_profiles[\(context)]: requesting \(authors.count) profiles from relay pool") - let filter = NostrFilter(kinds: [.metadata], authors: authors) - - for await noteLender in damus_state.nostrNetwork.reader.streamNotesUntilEndOfStoredEvents(filters: [filter]) { - let now = UInt64(Date.now.timeIntervalSince1970) - try noteLender.borrow { event in - if event.known_kind == .metadata { - damus_state.ndb.write_profile_last_fetched(pubkey: event.pubkey, fetched_at: now) - } - } - } - - print("load_profiles[\(context)]: done loading \(authors.count) profiles from relay pool") - } -} - diff --git a/damus/Features/Search/Models/SearchModel.swift b/damus/Features/Search/Models/SearchModel.swift index 3547f463..41964f36 100644 --- a/damus/Features/Search/Models/SearchModel.swift +++ b/damus/Features/Search/Models/SearchModel.swift @@ -54,9 +54,7 @@ class SearchModel: ObservableObject { } } - guard let txn = NdbTxn(ndb: state.ndb) else { return } try Task.checkCancellation() - load_profiles(context: "search", load: .from_events(self.events.all_events), damus_state: state, txn: txn) DispatchQueue.main.async { self.loading = false } diff --git a/damus/Features/Status/Views/UserStatusSheet.swift b/damus/Features/Status/Views/UserStatusSheet.swift index f2c2dd88..dbff37bf 100644 --- a/damus/Features/Status/Views/UserStatusSheet.swift +++ b/damus/Features/Status/Views/UserStatusSheet.swift @@ -129,7 +129,7 @@ struct UserStatusSheet: View { Divider() ZStack(alignment: .top) { - ProfilePicView(pubkey: keypair.pubkey, size: 120.0, highlight: .custom(DamusColors.white, 3.0), profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation) + ProfilePicView(pubkey: keypair.pubkey, size: 120.0, highlight: .custom(DamusColors.white, 3.0), profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation, damusState: damus_state) .padding(.top, 30) VStack(spacing: 0) { diff --git a/damus/Features/Timeline/Models/HomeModel.swift b/damus/Features/Timeline/Models/HomeModel.swift index 1b3223d9..125fffa5 100644 --- a/damus/Features/Timeline/Models/HomeModel.swift +++ b/damus/Features/Timeline/Models/HomeModel.swift @@ -454,22 +454,12 @@ class HomeModel: ContactsDelegate, ObservableObject { let id = UUID() Log.info("Initial filter task started with ID %s", for: .homeModel, id.uuidString) let filter = NostrFilter(kinds: [.contacts], limit: 1, authors: [damus_state.pubkey]) - for await item in damus_state.nostrNetwork.reader.subscribe(filters: [filter]) { - switch item { - case .event(let lender): - await lender.justUseACopy({ await process_event(ev: $0, context: .initialContactList) }) - continue - case .eose: - if !done_init { - done_init = true - Log.info("Initial filter task %s: Done initialization; Elapsed time: %.2f seconds", for: .homeModel, id.uuidString, CFAbsoluteTimeGetCurrent() - startTime) - send_home_filters() - } - break - case .ndbEose: - break - case .networkEose: - break + for await event in damus_state.nostrNetwork.reader.streamIndefinitely(filters: [filter]) { + await event.justUseACopy({ await process_event(ev: $0, context: .initialContactList) }) + if !done_init { + done_init = true + Log.info("Initial filter task %s: Done initialization; Elapsed time: %.2f seconds", for: .homeModel, id.uuidString, CFAbsoluteTimeGetCurrent() - startTime) + send_home_filters() } } @@ -477,14 +467,8 @@ class HomeModel: ContactsDelegate, ObservableObject { Task { let relayListFilter = NostrFilter(kinds: [.relay_list], limit: 1, authors: [damus_state.pubkey]) - for await item in damus_state.nostrNetwork.reader.subscribe(filters: [relayListFilter]) { - switch item { - case .event(let lender): - await lender.justUseACopy({ await process_event(ev: $0, context: .initialRelayList) }) - case .eose: break - case .ndbEose: break - case .networkEose: break - } + for await event in damus_state.nostrNetwork.reader.streamIndefinitely(filters: [relayListFilter]) { + await event.justUseACopy({ await process_event(ev: $0, context: .initialRelayList) }) } } } @@ -543,41 +527,25 @@ class HomeModel: ContactsDelegate, ObservableObject { self.contactsHandlerTask?.cancel() self.contactsHandlerTask = Task { - for await item in damus_state.nostrNetwork.reader.subscribe(filters: contacts_filters) { - switch item { - case .event(let lender): - await lender.justUseACopy({ await process_event(ev: $0, context: .contacts) }) - case .eose: continue - case .ndbEose: continue - case .networkEose: continue - } + for await event in damus_state.nostrNetwork.reader.streamIndefinitely(filters: contacts_filters) { + await event.justUseACopy({ await process_event(ev: $0, context: .contacts) }) } } self.notificationsHandlerTask?.cancel() self.notificationsHandlerTask = Task { - for await item in damus_state.nostrNetwork.reader.subscribe(filters: notifications_filters) { - switch item { - case .event(let lender): - await lender.justUseACopy({ await process_event(ev: $0, context: .notifications) }) - case .eose: - guard let txn = NdbTxn(ndb: damus_state.ndb) else { return } - load_profiles(context: "notifications", load: .from_keys(notifications.uniq_pubkeys()), damus_state: damus_state, txn: txn) - case .ndbEose: break - case .networkEose: break - } + for await event in damus_state.nostrNetwork.reader.streamIndefinitely(filters: notifications_filters) { + await event.justUseACopy({ await process_event(ev: $0, context: .notifications) }) } } self.dmsHandlerTask?.cancel() self.dmsHandlerTask = Task { - for await item in damus_state.nostrNetwork.reader.subscribe(filters: dms_filters) { + for await item in damus_state.nostrNetwork.reader.advancedStream(filters: dms_filters) { switch item { case .event(let lender): await lender.justUseACopy({ await process_event(ev: $0, context: .dms) }) case .eose: - guard let txn = NdbTxn(ndb: damus_state.ndb) else { return } var dms = dms.dms.flatMap { $0.events } dms.append(contentsOf: incoming_dms) - load_profiles(context: "dms", load: .from_events(dms), damus_state: damus_state, txn: txn) case .ndbEose: break case .networkEose: break } @@ -591,14 +559,8 @@ class HomeModel: ContactsDelegate, ObservableObject { var filter = NostrFilter(kinds: [.nwc_response]) filter.authors = [nwc.pubkey] filter.limit = 0 - for await item in damus_state.nostrNetwork.reader.subscribe(filters: [filter], to: [nwc.relay]) { - switch item { - case .event(let lender): - await lender.justUseACopy({ await process_event(ev: $0, context: .nwc) }) - case .eose: continue - case .ndbEose: continue - case .networkEose: continue - } + for await event in damus_state.nostrNetwork.reader.streamIndefinitely(filters: [filter], to: [nwc.relay]) { + await event.justUseACopy({ await process_event(ev: $0, context: .nwc) }) } } @@ -653,7 +615,7 @@ class HomeModel: ContactsDelegate, ObservableObject { DispatchQueue.main.async { self.loading = true } - for await item in damus_state.nostrNetwork.reader.subscribe(filters: home_filters, id: id) { + for await item in damus_state.nostrNetwork.reader.advancedStream(filters: home_filters, id: id) { switch item { case .event(let lender): let currentTime = CFAbsoluteTimeGetCurrent() @@ -664,20 +626,15 @@ class HomeModel: ContactsDelegate, ObservableObject { let eoseTime = CFAbsoluteTimeGetCurrent() Log.info("Home handler task %s: Received general EOSE after %.2f seconds", for: .homeModel, id.uuidString, eoseTime - startTime) - guard let txn = NdbTxn(ndb: damus_state.ndb) else { return } - load_profiles(context: "home", load: .from_events(events.events), damus_state: damus_state, txn: txn) - let finishTime = CFAbsoluteTimeGetCurrent() Log.info("Home handler task %s: Completed initial loading task after %.2f seconds", for: .homeModel, id.uuidString, eoseTime - startTime) case .ndbEose: let eoseTime = CFAbsoluteTimeGetCurrent() Log.info("Home handler task %s: Received NDB EOSE after %.2f seconds", for: .homeModel, id.uuidString, eoseTime - startTime) - - guard let txn = NdbTxn(ndb: damus_state.ndb) else { return } + DispatchQueue.main.async { self.loading = false } - load_profiles(context: "home", load: .from_events(events.events), damus_state: damus_state, txn: txn) let finishTime = CFAbsoluteTimeGetCurrent() Log.info("Home handler task %s: Completed initial NDB loading task after %.2f seconds", for: .homeModel, id.uuidString, eoseTime - startTime) diff --git a/damus/Features/Timeline/Views/SideMenuView.swift b/damus/Features/Timeline/Views/SideMenuView.swift index a6cbe8dc..d001e9a3 100644 --- a/damus/Features/Timeline/Views/SideMenuView.swift +++ b/damus/Features/Timeline/Views/SideMenuView.swift @@ -104,7 +104,7 @@ struct SideMenuView: View { return VStack(alignment: .leading) { HStack(spacing: 10) { - ProfilePicView(pubkey: damus_state.pubkey, size: 50, highlight: .none, profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation) + ProfilePicView(pubkey: damus_state.pubkey, size: 50, highlight: .none, profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation, damusState: damus_state) Spacer() diff --git a/damus/Features/Wallet/Models/WalletModel.swift b/damus/Features/Wallet/Models/WalletModel.swift index f8036485..ecde5121 100644 --- a/damus/Features/Wallet/Models/WalletModel.swift +++ b/damus/Features/Wallet/Models/WalletModel.swift @@ -182,27 +182,18 @@ class WalletModel: ObservableObject { ] nostrNetwork.send(event: requestEvent, to: [currentNwcUrl.relay], skipEphemeralRelays: false) - for await item in nostrNetwork.reader.subscribe(filters: responseFilters, to: [currentNwcUrl.relay], timeout: timeout) { - switch item { - case .event(let lender): - guard let responseEvent = try? lender.getCopy() else { throw .internalError } - - let fullWalletResponse: WalletConnect.FullWalletResponse - do { fullWalletResponse = try WalletConnect.FullWalletResponse(from: responseEvent, nwc: currentNwcUrl) } - catch { throw WalletRequestError.walletResponseDecodingError(error) } - - guard fullWalletResponse.req_id == requestEvent.id else { continue } // Our filters may match other responses - if let responseError = fullWalletResponse.response.error { throw .walletResponseError(responseError) } - - guard let result = fullWalletResponse.response.result else { throw .walletEmptyResponse } - return result - case .eose: - continue - case .ndbEose: - continue - case .networkEose: - continue - } + for await event in nostrNetwork.reader.timedStream(filters: responseFilters, to: [currentNwcUrl.relay], timeout: timeout) { + guard let responseEvent = try? event.getCopy() else { throw .internalError } + + let fullWalletResponse: WalletConnect.FullWalletResponse + do { fullWalletResponse = try WalletConnect.FullWalletResponse(from: responseEvent, nwc: currentNwcUrl) } + catch { throw WalletRequestError.walletResponseDecodingError(error) } + + guard fullWalletResponse.req_id == requestEvent.id else { continue } // Our filters may match other responses + if let responseError = fullWalletResponse.response.error { throw .walletResponseError(responseError) } + + guard let result = fullWalletResponse.response.result else { throw .walletEmptyResponse } + return result } do { try Task.checkCancellation() } catch { throw .cancelled } throw .responseTimeout diff --git a/damus/Features/Wallet/Views/TransactionsView.swift b/damus/Features/Wallet/Views/TransactionsView.swift index e85c392e..c71ccade 100644 --- a/damus/Features/Wallet/Views/TransactionsView.swift +++ b/damus/Features/Wallet/Views/TransactionsView.swift @@ -30,7 +30,7 @@ struct TransactionView: View { VStack(alignment: .leading) { HStack(alignment: .center) { ZStack { - ProfilePicView(pubkey: pubkey ?? ANON_PUBKEY, size: 45, highlight: .custom(.damusAdaptableBlack, 0.1), profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation, privacy_sensitive: true) + ProfilePicView(pubkey: pubkey ?? ANON_PUBKEY, size: 45, highlight: .custom(.damusAdaptableBlack, 0.1), profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation, privacy_sensitive: true, damusState: damus_state) .onTapGesture { if let pubkey { damus_state.nav.push(route: Route.ProfileByKey(pubkey: pubkey)) diff --git a/damus/Features/Zaps/Models/ZapsModel.swift b/damus/Features/Zaps/Models/ZapsModel.swift index 9eb063c5..760472ec 100644 --- a/damus/Features/Zaps/Models/ZapsModel.swift +++ b/damus/Features/Zaps/Models/ZapsModel.swift @@ -33,21 +33,8 @@ class ZapsModel: ObservableObject { } zapCommsListener?.cancel() zapCommsListener = Task { - for await item in state.nostrNetwork.reader.subscribe(filters: [filter]) { - switch item { - case .event(let lender): - await lender.justUseACopy({ event in - await self.handle_event(ev: event) - }) - case .eose: - let events = state.events.lookup_zaps(target: target).map { $0.request.ev } - guard let txn = NdbTxn(ndb: state.ndb) else { return } - load_profiles(context: "zaps_model", load: .from_events(events), damus_state: state, txn: txn) - case .ndbEose: - break - case .networkEose: - break - } + for await event in state.nostrNetwork.reader.streamIndefinitely(filters: [filter]) { + await event.justUseACopy({ await self.handle_event(ev: $0) }) } } } diff --git a/damus/Shared/Components/QRCodeView.swift b/damus/Shared/Components/QRCodeView.swift index 343f1dd9..03aa2a26 100644 --- a/damus/Shared/Components/QRCodeView.swift +++ b/damus/Shared/Components/QRCodeView.swift @@ -76,7 +76,7 @@ struct QRCodeView: View { let profile_txn = damus_state.profiles.lookup(id: pubkey, txn_name: "qrview-profile") let profile = profile_txn?.unsafeUnownedValue - ProfilePicView(pubkey: pubkey, size: 90.0, highlight: .custom(DamusColors.white, 3.0), profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation) + ProfilePicView(pubkey: pubkey, size: 90.0, highlight: .custom(DamusColors.white, 3.0), profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation, damusState: damus_state) .padding(.top, 20) if let display_name = profile?.display_name { diff --git a/damus/Shared/Components/UserView.swift b/damus/Shared/Components/UserView.swift index c83b0c0a..492c377c 100644 --- a/damus/Shared/Components/UserView.swift +++ b/damus/Shared/Components/UserView.swift @@ -34,7 +34,7 @@ struct UserView: View { var body: some View { VStack { HStack { - ProfilePicView(pubkey: pubkey, size: PFP_SIZE, highlight: .none, profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation) + ProfilePicView(pubkey: pubkey, size: PFP_SIZE, highlight: .none, profiles: damus_state.profiles, disable_animation: damus_state.settings.disable_animation, damusState: damus_state) VStack(alignment: .leading) { ProfileName(pubkey: pubkey, damus: damus_state, show_nip5_domain: false) diff --git a/nostrdb/UnownedNdbNote.swift b/nostrdb/UnownedNdbNote.swift index 550c4b71..79ef237b 100644 --- a/nostrdb/UnownedNdbNote.swift +++ b/nostrdb/UnownedNdbNote.swift @@ -110,7 +110,7 @@ enum NdbNoteLender: Sendable { return try self.getCopy() } catch { - assertionFailure("Unexpected error while fetching a copy of an NdbNote: \(error.localizedDescription)") +// assertionFailure("Unexpected error while fetching a copy of an NdbNote: \(error.localizedDescription)") Log.error("Unexpected error while fetching a copy of an NdbNote: %s", for: .ndb, error.localizedDescription) } return nil