Improve loading speed on home timeline
This commit improves the loading speed for the home timeline (and likely other areas of the app) by employing various techniques and changes: - Network EOSE timeout reduced from 10 seconds down to 5 seconds - Network EOSE does not wait on relays with broken connections - Offload HomeModel handler event processing to separate tasks to avoid a large backlog - Give SubscriptionManager streamers more fine-grained EOSE signals for local optimization - Only wait for Ndb EOSE on the home timeline for faster loading - Add logging with time elapsed measurements for easier identification of loading problems Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
@@ -18,6 +18,8 @@ extension NostrNetworkManager {
|
||||
private var taskManager: TaskManager
|
||||
private let experimentalLocalRelayModelSupport: Bool
|
||||
|
||||
let EXTRA_VERBOSE_LOGGING: Bool = false
|
||||
|
||||
init(pool: RelayPool, ndb: Ndb, experimentalLocalRelayModelSupport: Bool) {
|
||||
self.pool = pool
|
||||
self.ndb = ndb
|
||||
@@ -28,17 +30,21 @@ extension NostrNetworkManager {
|
||||
// MARK: - Subscribing and Streaming data from Nostr
|
||||
|
||||
/// Streams notes until the EOSE signal
|
||||
func streamNotesUntilEndOfStoredEvents(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil) -> AsyncStream<NdbNoteLender> {
|
||||
func streamNotesUntilEndOfStoredEvents(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration? = nil, id: UUID? = nil) -> AsyncStream<NdbNoteLender> {
|
||||
let timeout = timeout ?? .seconds(10)
|
||||
return AsyncStream<NdbNoteLender> { continuation in
|
||||
let streamingTask = Task {
|
||||
outerLoop: for await item in self.subscribe(filters: filters, to: desiredRelays, timeout: timeout) {
|
||||
outerLoop: for await item in self.subscribe(filters: filters, to: desiredRelays, timeout: timeout, id: id) {
|
||||
try Task.checkCancellation()
|
||||
switch item {
|
||||
case .event(let lender):
|
||||
continuation.yield(lender)
|
||||
case .eose:
|
||||
break outerLoop
|
||||
case .ndbEose:
|
||||
continue
|
||||
case .networkEose:
|
||||
continue
|
||||
}
|
||||
}
|
||||
continuation.finish()
|
||||
@@ -52,10 +58,10 @@ extension NostrNetworkManager {
|
||||
/// Subscribes to data from user's relays, for a maximum period of time — after which the stream will end.
|
||||
///
|
||||
/// This is useful when waiting for some specific data from Nostr, but not indefinitely.
|
||||
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration) -> AsyncStream<StreamItem> {
|
||||
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, timeout: Duration, id: UUID? = nil) -> AsyncStream<StreamItem> {
|
||||
return AsyncStream<StreamItem> { continuation in
|
||||
let streamingTask = Task {
|
||||
for await item in self.subscribe(filters: filters, to: desiredRelays) {
|
||||
for await item in self.subscribe(filters: filters, to: desiredRelays, id: id) {
|
||||
try Task.checkCancellation()
|
||||
continuation.yield(item)
|
||||
}
|
||||
@@ -79,9 +85,10 @@ extension NostrNetworkManager {
|
||||
///
|
||||
/// - Parameter filters: The nostr filters to specify what kind of data to subscribe to
|
||||
/// - Returns: An async stream of nostr data
|
||||
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream<StreamItem> {
|
||||
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
|
||||
return AsyncStream<StreamItem> { continuation in
|
||||
let subscriptionId = UUID()
|
||||
let subscriptionId = id ?? UUID()
|
||||
let startTime = CFAbsoluteTimeGetCurrent()
|
||||
Log.info("Starting subscription %s: %s", for: .subscription_manager, subscriptionId.uuidString, filters.debugDescription)
|
||||
let multiSessionStreamingTask = Task {
|
||||
while !Task.isCancelled {
|
||||
@@ -97,7 +104,7 @@ extension NostrNetworkManager {
|
||||
continue
|
||||
}
|
||||
Log.info("%s: Streaming.", for: .subscription_manager, subscriptionId.uuidString)
|
||||
for await item in self.sessionSubscribe(filters: filters, to: desiredRelays) {
|
||||
for await item in self.sessionSubscribe(filters: filters, to: desiredRelays, id: id) {
|
||||
try Task.checkCancellation()
|
||||
continuation.yield(item)
|
||||
}
|
||||
@@ -127,8 +134,11 @@ extension NostrNetworkManager {
|
||||
///
|
||||
/// - Parameter filters: The nostr filters to specify what kind of data to subscribe to
|
||||
/// - Returns: An async stream of nostr data
|
||||
private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil) -> AsyncStream<StreamItem> {
|
||||
private func sessionSubscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
|
||||
let id = id ?? UUID()
|
||||
return AsyncStream<StreamItem> { continuation in
|
||||
let startTime = CFAbsoluteTimeGetCurrent()
|
||||
Log.debug("Session subscription %s: Started", for: .subscription_manager, id.uuidString)
|
||||
var ndbEOSEIssued = false
|
||||
var networkEOSEIssued = false
|
||||
|
||||
@@ -143,6 +153,7 @@ extension NostrNetworkManager {
|
||||
(ndbEOSEIssued && (networkEOSEIssued || !connectedToNetwork))
|
||||
|
||||
if canIssueEOSE {
|
||||
Log.debug("Session subscription %s: Issued EOSE for session. Elapsed: %.2f seconds", for: .subscription_manager, id.uuidString, CFAbsoluteTimeGetCurrent() - startTime)
|
||||
continuation.yield(.eose)
|
||||
}
|
||||
}
|
||||
@@ -153,7 +164,8 @@ extension NostrNetworkManager {
|
||||
try Task.checkCancellation()
|
||||
switch item {
|
||||
case .eose:
|
||||
Log.debug("Session subscribe: Received EOSE from nostrdb", for: .subscription_manager)
|
||||
Log.debug("Session subscription %s: Received EOSE from nostrdb. Elapsed: %.2f seconds", for: .subscription_manager, id.uuidString, CFAbsoluteTimeGetCurrent() - startTime)
|
||||
continuation.yield(.ndbEose)
|
||||
ndbEOSEIssued = true
|
||||
yieldEOSEIfReady()
|
||||
case .event(let noteKey):
|
||||
@@ -170,32 +182,35 @@ extension NostrNetworkManager {
|
||||
}
|
||||
}
|
||||
catch {
|
||||
Log.error("NDB streaming error: %s", for: .subscription_manager, error.localizedDescription)
|
||||
Log.error("Session subscription %s: NDB streaming error: %s", for: .subscription_manager, id.uuidString, error.localizedDescription)
|
||||
}
|
||||
continuation.finish()
|
||||
}
|
||||
let streamTask = Task {
|
||||
do {
|
||||
for await item in self.pool.subscribe(filters: filters, to: desiredRelays) {
|
||||
for await item in self.pool.subscribe(filters: filters, to: desiredRelays, id: id) {
|
||||
// NO-OP. Notes will be automatically ingested by NostrDB
|
||||
// TODO: Improve efficiency of subscriptions?
|
||||
try Task.checkCancellation()
|
||||
switch item {
|
||||
case .event(let event):
|
||||
Log.debug("Session subscribe: Received kind %d event with id %s from the network", for: .subscription_manager, event.kind, event.id.hex())
|
||||
if EXTRA_VERBOSE_LOGGING {
|
||||
Log.debug("Session subscription %s: Received kind %d event with id %s from the network", for: .subscription_manager, id.uuidString, event.kind, event.id.hex())
|
||||
}
|
||||
if !self.experimentalLocalRelayModelSupport {
|
||||
// In normal mode (non-experimental), we stream from ndb but also directly from the network
|
||||
continuation.yield(.event(lender: NdbNoteLender(ownedNdbNote: event)))
|
||||
}
|
||||
case .eose:
|
||||
Log.debug("Session subscribe: Received EOSE from the network", for: .subscription_manager)
|
||||
Log.debug("Session subscription %s: Received EOSE from the network. Elapsed: %.2f seconds", for: .subscription_manager, id.uuidString, CFAbsoluteTimeGetCurrent() - startTime)
|
||||
continuation.yield(.networkEose)
|
||||
networkEOSEIssued = true
|
||||
yieldEOSEIfReady()
|
||||
}
|
||||
}
|
||||
}
|
||||
catch {
|
||||
Log.error("Network streaming error: %s", for: .subscription_manager, error.localizedDescription)
|
||||
Log.error("Session subscription %s: Network streaming error: %s", for: .subscription_manager, id.uuidString, error.localizedDescription)
|
||||
}
|
||||
continuation.finish()
|
||||
}
|
||||
@@ -348,7 +363,27 @@ extension NostrNetworkManager {
|
||||
enum StreamItem {
|
||||
/// An event which can be borrowed from NostrDB
|
||||
case event(lender: NdbNoteLender)
|
||||
/// The end of stored events
|
||||
/// The canonical "end of stored events". See implementations of `subscribe` to see when this event is fired in relation to other EOSEs
|
||||
case eose
|
||||
/// "End of stored events" from NostrDB.
|
||||
case ndbEose
|
||||
/// "End of stored events" from all relays in `RelayPool`.
|
||||
case networkEose
|
||||
|
||||
var debugDescription: String {
|
||||
switch self {
|
||||
case .event(lender: let lender):
|
||||
let detailedDescription = try? lender.borrow({ event in
|
||||
"Note with ID: \(event.id.hex())"
|
||||
})
|
||||
return detailedDescription ?? "Some note"
|
||||
case .eose:
|
||||
return "EOSE"
|
||||
case .ndbEose:
|
||||
return "NDB EOSE"
|
||||
case .networkEose:
|
||||
return "NETWORK EOSE"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,6 +145,8 @@ extension NostrNetworkManager {
|
||||
try? self.set(userRelayList: relayList) // Set the validated list
|
||||
})
|
||||
case .eose: continue
|
||||
case .ndbEose: continue
|
||||
case .networkEose: continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,11 +232,13 @@ class RelayPool {
|
||||
/// - desiredRelays: The desired relays which to subsctibe to. If `nil`, it defaults to the `RelayPool`'s default list
|
||||
/// - eoseTimeout: The maximum timeout which to give up waiting for the eoseSignal
|
||||
/// - Returns: Returns an async stream that callers can easily consume via a for-loop
|
||||
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, eoseTimeout: Duration? = nil) -> AsyncStream<StreamItem> {
|
||||
let eoseTimeout = eoseTimeout ?? .seconds(10)
|
||||
let desiredRelays = desiredRelays ?? self.relays.map({ $0.descriptor.url })
|
||||
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, eoseTimeout: Duration? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
|
||||
let eoseTimeout = eoseTimeout ?? .seconds(5)
|
||||
let desiredRelays = desiredRelays ?? self.relays.filter({ $0.connection.isConnected }).map({ $0.descriptor.url })
|
||||
let startTime = CFAbsoluteTimeGetCurrent()
|
||||
return AsyncStream<StreamItem> { continuation in
|
||||
let sub_id = UUID().uuidString
|
||||
let id = id ?? UUID()
|
||||
let sub_id = id.uuidString
|
||||
var seenEvents: Set<NoteId> = []
|
||||
var relaysWhoFinishedInitialResults: Set<RelayURL> = []
|
||||
var eoseSent = false
|
||||
@@ -257,6 +259,7 @@ class RelayPool {
|
||||
break // We do not support handling these yet
|
||||
case .eose(_):
|
||||
relaysWhoFinishedInitialResults.insert(relayUrl)
|
||||
Log.debug("RelayPool subscription %s: EOSE from %s. EOSE count: %d/%d. Elapsed: %.2f seconds.", for: .networking, id.uuidString, relayUrl.absoluteString, relaysWhoFinishedInitialResults.count, Set(desiredRelays).count, CFAbsoluteTimeGetCurrent() - startTime)
|
||||
if relaysWhoFinishedInitialResults == Set(desiredRelays) {
|
||||
continuation.yield(with: .success(.eose))
|
||||
eoseSent = true
|
||||
|
||||
Reference in New Issue
Block a user