Add EntityPreloader for batched profile metadata preloading
Implements an actor-based preloading system to efficiently fetch profile metadata for note authors and referenced users. The EntityPreloader queues requests and batches them intelligently (500 pubkeys or 1 second timeout) to avoid network overload while improving UX by ensuring profiles are available when rendering notes. Key changes: - Add EntityPreloader actor with queue-based batching logic - Integrate with SubscriptionManager via PreloadStrategy enum - Add lifecycle management (start/stop on app foreground/background) - Skip preload for pubkeys already cached in ndb - Include comprehensive test suite with 11 test cases covering batching, deduplication, and edge cases - Optimize ProfilePicView to load from ndb before first render Closes: https://github.com/damus-io/damus/issues/gh-3511 Changelog-Added: Profile metadata preloading for improved timeline performance Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
196
damus/Core/Networking/NostrNetworkManager/EntityPreloader.swift
Normal file
196
damus/Core/Networking/NostrNetworkManager/EntityPreloader.swift
Normal file
@@ -0,0 +1,196 @@
|
||||
//
|
||||
// EntityPreloader.swift
|
||||
// damus
|
||||
//
|
||||
// Created by Daniel D'Aquino on 2026-01-22.
|
||||
//
|
||||
|
||||
import Foundation
|
||||
import os
|
||||
import Negentropy
|
||||
|
||||
extension NostrNetworkManager {
|
||||
/// Preloads entities referenced in notes to improve user experience.
|
||||
///
|
||||
/// This actor efficiently batches entity preload requests to avoid overloading the network.
|
||||
/// Currently limited to preloading profile metadata, but designed to be expanded to other
|
||||
/// entity types (e.g., referenced events, media) in the future.
|
||||
///
|
||||
/// ## Implementation notes
|
||||
///
|
||||
/// - Uses a queue to collect preload requests
|
||||
/// - Batches requests intelligently: either when 500 pending requests accumulate, or after 1 second
|
||||
/// - Uses standard Nostr subscriptions to fetch metadata
|
||||
/// - Runs a long-running task to process the queue continuously
|
||||
actor EntityPreloader {
|
||||
private let pool: RelayPool
|
||||
private let ndb: Ndb
|
||||
private let queue: QueueableNotify<Set<Pubkey>>
|
||||
private var processingTask: Task<Void, Never>?
|
||||
private var accumulatedPubkeys = Set<Pubkey>()
|
||||
|
||||
private static let logger = Logger(
|
||||
subsystem: Constants.MAIN_APP_BUNDLE_IDENTIFIER,
|
||||
category: "entity_preloader"
|
||||
)
|
||||
|
||||
/// Maximum number of items allowed in the queue before old items are discarded
|
||||
private static let maxQueueItems = 1000
|
||||
/// Batch size threshold - preload immediately when this many requests are pending
|
||||
private static let batchSizeThreshold = 500
|
||||
/// Time threshold - preload after this duration even if batch size not reached
|
||||
private static let timeThreshold: Duration = .seconds(1)
|
||||
|
||||
init(pool: RelayPool, ndb: Ndb) {
|
||||
self.pool = pool
|
||||
self.ndb = ndb
|
||||
self.queue = QueueableNotify<Set<Pubkey>>(maxQueueItems: Self.maxQueueItems)
|
||||
}
|
||||
|
||||
/// Starts the preloader's background processing task
|
||||
func start() {
|
||||
guard processingTask == nil else {
|
||||
Self.logger.warning("EntityPreloader already started")
|
||||
return
|
||||
}
|
||||
|
||||
Self.logger.info("Starting EntityPreloader")
|
||||
processingTask = Task {
|
||||
await monitorQueue()
|
||||
}
|
||||
}
|
||||
|
||||
/// Stops the preloader's background processing task
|
||||
func stop() {
|
||||
Self.logger.info("Stopping EntityPreloader")
|
||||
processingTask?.cancel()
|
||||
processingTask = nil
|
||||
}
|
||||
|
||||
/// Preloads metadata for the author and referenced profiles in a note
|
||||
///
|
||||
/// - Parameter noteLender: The note to extract profiles from
|
||||
nonisolated func preload(note noteLender: NdbNoteLender) {
|
||||
Task {
|
||||
do {
|
||||
let pubkeys = try noteLender.borrow { event in
|
||||
if event.known_kind == .metadata { return Set<Pubkey>() } // Don't preload pubkeys from a user profile
|
||||
var pubkeys = Set<Pubkey>()
|
||||
|
||||
// Add the author
|
||||
pubkeys.insert(event.pubkey)
|
||||
|
||||
// Add all referenced pubkeys from p tags
|
||||
for referencedPubkey in event.referenced_pubkeys {
|
||||
pubkeys.insert(referencedPubkey)
|
||||
}
|
||||
|
||||
return pubkeys
|
||||
}
|
||||
|
||||
guard !pubkeys.isEmpty else { return }
|
||||
|
||||
// Filter out pubkeys that already have profiles in ndb
|
||||
let pubkeysToPreload = await pubkeys.asyncFilter { pubkey in
|
||||
let hasProfile = (try? await ndb.lookup_profile(pubkey, borrow: { pr in
|
||||
pr != nil
|
||||
})) ?? false
|
||||
return !hasProfile
|
||||
}
|
||||
|
||||
guard !pubkeysToPreload.isEmpty else {
|
||||
Self.logger.debug("All \(pubkeys.count, privacy: .public) profiles already in ndb, skipping preload")
|
||||
return
|
||||
}
|
||||
|
||||
Self.logger.debug("Queueing preload for \(pubkeysToPreload.count, privacy: .public) profiles (\(pubkeys.count - pubkeysToPreload.count, privacy: .public) already cached)")
|
||||
await queue.add(item: pubkeysToPreload)
|
||||
} catch {
|
||||
Self.logger.error("Error extracting pubkeys from note: \(error.localizedDescription, privacy: .public)")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes the queue continuously, batching requests intelligently
|
||||
private func monitorQueue() async {
|
||||
await withThrowingTaskGroup { group in
|
||||
group.addTask {
|
||||
for await newPubkeys in await self.queue.stream {
|
||||
try Task.checkCancellation()
|
||||
await self.handle(newQueueItem: newPubkeys)
|
||||
}
|
||||
}
|
||||
|
||||
group.addTask {
|
||||
while !Task.isCancelled {
|
||||
try await Task.sleep(for: Self.timeThreshold)
|
||||
await self.handleTimerTick()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func handleTimerTick() async {
|
||||
if accumulatedPubkeys.count > 0 {
|
||||
await self.performPreload()
|
||||
}
|
||||
}
|
||||
|
||||
private func handle(newQueueItem: Set<Pubkey>) async {
|
||||
accumulatedPubkeys = self.accumulatedPubkeys.union(newQueueItem)
|
||||
if accumulatedPubkeys.count > Self.batchSizeThreshold {
|
||||
await self.performPreload()
|
||||
}
|
||||
}
|
||||
|
||||
private func performPreload() async {
|
||||
let pubkeysToPreload = accumulatedPubkeys
|
||||
accumulatedPubkeys.removeAll()
|
||||
Self.logger.debug("Preloading \(pubkeysToPreload.count, privacy: .public) profiles")
|
||||
await self.performPreload(pubkeys: pubkeysToPreload)
|
||||
}
|
||||
|
||||
/// Performs the actual preload operation using standard Nostr subscriptions.
|
||||
///
|
||||
/// - Parameter pubkeys: The set of pubkeys to preload metadata for
|
||||
private func performPreload(pubkeys: Set<Pubkey>) async {
|
||||
guard !pubkeys.isEmpty else { return }
|
||||
|
||||
print("EntityPreloader.performPreload: Starting preload for \(pubkeys.count) pubkeys")
|
||||
|
||||
let filter = NostrFilter(
|
||||
kinds: [.metadata],
|
||||
authors: Array(pubkeys)
|
||||
)
|
||||
|
||||
for try await _ in await pool.subscribeExistingItems(
|
||||
filters: [filter],
|
||||
to: nil,
|
||||
eoseTimeout: .seconds(10),
|
||||
) {
|
||||
// NO-OP: We are only subscribing to let nostrdb ingest those events, but we do not need special handling here.
|
||||
guard !Task.isCancelled else { break }
|
||||
}
|
||||
|
||||
Self.logger.debug("Completed metadata fetch for \(pubkeys.count, privacy: .public) profiles")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Private Extensions
|
||||
|
||||
private extension Set {
|
||||
/// Asynchronously filters the set based on an async predicate
|
||||
///
|
||||
/// - Parameter predicate: An async closure that returns true for elements to include
|
||||
/// - Returns: A new set containing only elements for which predicate returns true
|
||||
func asyncFilter(_ predicate: (Element) async -> Bool) async -> Set<Element> {
|
||||
var result = Set<Element>()
|
||||
for element in self {
|
||||
if await predicate(element) {
|
||||
result.insert(element)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
}
|
||||
@@ -53,6 +53,7 @@ class NostrNetworkManager {
|
||||
func connect() async {
|
||||
await self.userRelayList.connect() // Will load the user's list, apply it, and get RelayPool to connect to it.
|
||||
await self.profilesManager.load()
|
||||
await self.reader.startPreloader()
|
||||
}
|
||||
|
||||
func disconnectRelays() async {
|
||||
@@ -61,12 +62,14 @@ class NostrNetworkManager {
|
||||
|
||||
func handleAppBackgroundRequest() async {
|
||||
await self.reader.cancelAllTasks()
|
||||
await self.reader.stopPreloader()
|
||||
await self.pool.cleanQueuedRequestForSessionEnd()
|
||||
}
|
||||
|
||||
func handleAppForegroundRequest() async {
|
||||
// Pinging the network will automatically reconnect any dead websocket connections
|
||||
await self.ping()
|
||||
await self.reader.startPreloader()
|
||||
}
|
||||
|
||||
func close() async {
|
||||
@@ -78,6 +81,9 @@ class NostrNetworkManager {
|
||||
group.addTask {
|
||||
await self.profilesManager.stop()
|
||||
}
|
||||
group.addTask {
|
||||
await self.reader.stopPreloader()
|
||||
}
|
||||
// But await on each one to prevent race conditions
|
||||
for await value in group { continue }
|
||||
await pool.close()
|
||||
|
||||
@@ -20,6 +20,7 @@ extension NostrNetworkManager {
|
||||
private var ndb: Ndb
|
||||
private var taskManager: TaskManager
|
||||
private let experimentalLocalRelayModelSupport: Bool
|
||||
private let entityPreloader: EntityPreloader
|
||||
|
||||
private static let logger = Logger(
|
||||
subsystem: Constants.MAIN_APP_BUNDLE_IDENTIFIER,
|
||||
@@ -31,16 +32,17 @@ extension NostrNetworkManager {
|
||||
self.ndb = ndb
|
||||
self.taskManager = TaskManager()
|
||||
self.experimentalLocalRelayModelSupport = experimentalLocalRelayModelSupport
|
||||
self.entityPreloader = EntityPreloader(pool: pool, ndb: ndb)
|
||||
}
|
||||
|
||||
// MARK: - Subscribing and Streaming data from Nostr
|
||||
|
||||
/// Streams notes until the EOSE signal
|
||||
func streamExistingEvents(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
|
||||
func streamExistingEvents(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, preloadStrategy: PreloadStrategy? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
|
||||
let timeout = timeout ?? .seconds(10)
|
||||
return AsyncStream<NdbNoteLender> { continuation in
|
||||
let streamingTask = Task {
|
||||
outerLoop: for await item in self.advancedStream(filters: filters, to: desiredRelays, timeout: timeout, streamMode: streamMode, id: id) {
|
||||
outerLoop: for await item in self.advancedStream(filters: filters, to: desiredRelays, timeout: timeout, streamMode: streamMode, preloadStrategy: preloadStrategy, id: id) {
|
||||
try Task.checkCancellation()
|
||||
switch item {
|
||||
case .event(let lender):
|
||||
@@ -64,10 +66,10 @@ extension NostrNetworkManager {
|
||||
/// Subscribes to data from user's relays, for a maximum period of time — after which the stream will end.
|
||||
///
|
||||
/// This is useful when waiting for some specific data from Nostr, but not indefinitely.
|
||||
func timedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
|
||||
func timedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration, streamMode: StreamMode? = nil, preloadStrategy: PreloadStrategy? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
|
||||
return AsyncStream<NdbNoteLender> { continuation in
|
||||
let streamingTask = Task {
|
||||
for await item in self.advancedStream(filters: filters, to: desiredRelays, timeout: timeout, streamMode: streamMode, id: id) {
|
||||
for await item in self.advancedStream(filters: filters, to: desiredRelays, timeout: timeout, streamMode: streamMode, preloadStrategy: preloadStrategy, id: id) {
|
||||
try Task.checkCancellation()
|
||||
switch item {
|
||||
case .event(lender: let lender):
|
||||
@@ -88,10 +90,10 @@ extension NostrNetworkManager {
|
||||
/// Subscribes to notes indefinitely
|
||||
///
|
||||
/// This is useful when simply streaming all events indefinitely
|
||||
func streamIndefinitely(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
|
||||
func streamIndefinitely(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, streamMode: StreamMode? = nil, preloadStrategy: PreloadStrategy? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
|
||||
return AsyncStream<NdbNoteLender> { continuation in
|
||||
let streamingTask = Task {
|
||||
for await item in self.advancedStream(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) {
|
||||
for await item in self.advancedStream(filters: filters, to: desiredRelays, streamMode: streamMode, preloadStrategy: preloadStrategy, id: id) {
|
||||
try Task.checkCancellation()
|
||||
switch item {
|
||||
case .event(lender: let lender):
|
||||
@@ -111,9 +113,10 @@ extension NostrNetworkManager {
|
||||
}
|
||||
}
|
||||
|
||||
func advancedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
|
||||
func advancedStream(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, streamMode: StreamMode? = nil, preloadStrategy: PreloadStrategy? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
|
||||
let id = id ?? UUID()
|
||||
let streamMode = streamMode ?? defaultStreamMode()
|
||||
let preloadStrategy = preloadStrategy ?? self.defaultPreloadingMode()
|
||||
return AsyncStream<StreamItem> { continuation in
|
||||
let timeoutTask = Task {
|
||||
guard let timeout else { return }
|
||||
@@ -163,6 +166,10 @@ extension NostrNetworkManager {
|
||||
switch item {
|
||||
case .event(let lender):
|
||||
logStreamPipelineStats("SubscriptionManager_Advanced_Stream_\(id)", "Consumer_\(id)")
|
||||
// Preload entities if requested
|
||||
if case .preload = preloadStrategy {
|
||||
self.entityPreloader.preload(note: lender)
|
||||
}
|
||||
continuation.yield(item)
|
||||
case .eose:
|
||||
break // Should not happen
|
||||
@@ -201,6 +208,10 @@ extension NostrNetworkManager {
|
||||
}
|
||||
negentropyStorageVector.unsealAndInsert(nostrEvent: event)
|
||||
})
|
||||
// Preload entities if requested
|
||||
if case .preload = preloadStrategy {
|
||||
self.entityPreloader.preload(note: lender)
|
||||
}
|
||||
continuation.yield(item)
|
||||
case .eose:
|
||||
break // Should not happen
|
||||
@@ -392,6 +403,10 @@ extension NostrNetworkManager {
|
||||
self.experimentalLocalRelayModelSupport ? .ndbFirst(networkOptimization: nil) : .ndbAndNetworkParallel(networkOptimization: nil)
|
||||
}
|
||||
|
||||
private func defaultPreloadingMode() -> PreloadStrategy {
|
||||
return .preload
|
||||
}
|
||||
|
||||
// MARK: - Finding specific data from Nostr
|
||||
|
||||
/// Finds a non-replaceable event based on a note ID
|
||||
@@ -495,6 +510,14 @@ extension NostrNetworkManager {
|
||||
|
||||
// MARK: - Task management
|
||||
|
||||
func startPreloader() async {
|
||||
await self.entityPreloader.start()
|
||||
}
|
||||
|
||||
func stopPreloader() async {
|
||||
await self.entityPreloader.stop()
|
||||
}
|
||||
|
||||
func cancelAllTasks() async {
|
||||
await self.taskManager.cancelAllTasks()
|
||||
}
|
||||
@@ -628,4 +651,12 @@ extension NostrNetworkManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Defines the preloading strategy for a stream
|
||||
enum PreloadStrategy {
|
||||
/// No preloading - notes are not sent to EntityPreloader
|
||||
case noPreloading
|
||||
/// Preload metadata for authors and referenced profiles
|
||||
case preload
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user