diff --git a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift index 86fa8b79..4a7eaf98 100644 --- a/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/SubscriptionManager.swift @@ -5,6 +5,8 @@ // Created by Daniel D’Aquino on 2025-03-25. // import Foundation +import os + extension NostrNetworkManager { /// Reads or fetches information from RelayPool and NostrDB, and provides an easier and unified higher-level interface. @@ -18,6 +20,11 @@ extension NostrNetworkManager { private var taskManager: TaskManager private let experimentalLocalRelayModelSupport: Bool + private static let logger = Logger( + subsystem: Constants.MAIN_APP_BUNDLE_IDENTIFIER, + category: "subscription_manager" + ) + let EXTRA_VERBOSE_LOGGING: Bool = false init(pool: RelayPool, ndb: Ndb, experimentalLocalRelayModelSupport: Bool) { @@ -110,33 +117,33 @@ extension NostrNetworkManager { return AsyncStream { continuation in let subscriptionId = id ?? UUID() let startTime = CFAbsoluteTimeGetCurrent() - Log.info("Starting subscription %s: %s", for: .subscription_manager, subscriptionId.uuidString, filters.debugDescription) + Self.logger.info("Starting subscription \(subscriptionId.uuidString, privacy: .public): \(filters.debugDescription, privacy: .private)") let multiSessionStreamingTask = Task { while !Task.isCancelled { do { guard !self.ndb.is_closed else { - Log.info("%s: Ndb closed. Sleeping for 1 second before resuming.", for: .subscription_manager, subscriptionId.uuidString) + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Ndb closed. Sleeping for 1 second before resuming.") try await Task.sleep(nanoseconds: 1_000_000_000) continue } guard self.pool.open else { - Log.info("%s: RelayPool closed. Sleeping for 1 second before resuming.", for: .subscription_manager, subscriptionId.uuidString) + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): RelayPool closed. Sleeping for 1 second before resuming.") try await Task.sleep(nanoseconds: 1_000_000_000) continue } - Log.info("%s: Streaming.", for: .subscription_manager, subscriptionId.uuidString) + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Streaming.") for await item in self.sessionSubscribe(filters: filters, to: desiredRelays, streamMode: streamMode, id: id) { try Task.checkCancellation() continuation.yield(item) } - Log.info("%s: Session subscription ended. Sleeping for 1 second before resuming.", for: .subscription_manager, subscriptionId.uuidString) + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Session subscription ended. Sleeping for 1 second before resuming.") try await Task.sleep(nanoseconds: 1_000_000_000) } catch { - Log.error("%s: Error: %s", for: .subscription_manager, subscriptionId.uuidString, error.localizedDescription) + Self.logger.error("Session subscription \(subscriptionId.uuidString, privacy: .public): Error: \(error.localizedDescription, privacy: .public)") } } - Log.info("%s: Terminated.", for: .subscription_manager, subscriptionId.uuidString) + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Terminated.") } let timeoutTask = Task { if let timeout { @@ -145,7 +152,7 @@ extension NostrNetworkManager { } } continuation.onTermination = { @Sendable _ in - Log.info("%s: Cancelled.", for: .subscription_manager, subscriptionId.uuidString) + Self.logger.info("\(subscriptionId.uuidString, privacy: .public): Cancelled.") multiSessionStreamingTask.cancel() timeoutTask.cancel() } @@ -167,7 +174,7 @@ extension NostrNetworkManager { let streamMode = streamMode ?? defaultStreamMode() return AsyncStream { continuation in let startTime = CFAbsoluteTimeGetCurrent() - Log.debug("Session subscription %s: Started", for: .subscription_manager, id.uuidString) + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Started") var ndbEOSEIssued = false var networkEOSEIssued = false @@ -182,7 +189,7 @@ extension NostrNetworkManager { } if canIssueEOSE { - Log.debug("Session subscription %s: Issued EOSE for session. Elapsed: %.2f seconds", for: .subscription_manager, id.uuidString, CFAbsoluteTimeGetCurrent() - startTime) + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Issued EOSE for session. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds") continuation.yield(.eose) } } @@ -193,7 +200,7 @@ extension NostrNetworkManager { try Task.checkCancellation() switch item { case .eose: - Log.debug("Session subscription %s: Received EOSE from nostrdb. Elapsed: %.2f seconds", for: .subscription_manager, id.uuidString, CFAbsoluteTimeGetCurrent() - startTime) + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from nostrdb. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds") continuation.yield(.ndbEose) ndbEOSEIssued = true yieldEOSEIfReady() @@ -211,7 +218,7 @@ extension NostrNetworkManager { } } catch { - Log.error("Session subscription %s: NDB streaming error: %s", for: .subscription_manager, id.uuidString, error.localizedDescription) + Self.logger.error("Session subscription \(id.uuidString, privacy: .public): NDB streaming error: \(error.localizedDescription, privacy: .public)") } continuation.finish() } @@ -224,7 +231,7 @@ extension NostrNetworkManager { switch item { case .event(let event): if EXTRA_VERBOSE_LOGGING { - Log.debug("Session subscription %s: Received kind %d event with id %s from the network", for: .subscription_manager, id.uuidString, event.kind, event.id.hex()) + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received kind \(event.kind, privacy: .public) event with id \(event.id.hex(), privacy: .private) from the network") } switch streamMode { case .ndbFirst: @@ -233,7 +240,7 @@ extension NostrNetworkManager { continuation.yield(.event(lender: NdbNoteLender(ownedNdbNote: event))) } case .eose: - Log.debug("Session subscription %s: Received EOSE from the network. Elapsed: %.2f seconds", for: .subscription_manager, id.uuidString, CFAbsoluteTimeGetCurrent() - startTime) + Self.logger.debug("Session subscription \(id.uuidString, privacy: .public): Received EOSE from the network. Elapsed: \(CFAbsoluteTimeGetCurrent() - startTime, format: .fixed(precision: 2), privacy: .public) seconds") continuation.yield(.networkEose) networkEOSEIssued = true yieldEOSEIfReady() @@ -241,7 +248,7 @@ extension NostrNetworkManager { } } catch { - Log.error("Session subscription %s: Network streaming error: %s", for: .subscription_manager, id.uuidString, error.localizedDescription) + Self.logger.error("Session subscription \(id.uuidString, privacy: .public): Network streaming error: \(error.localizedDescription, privacy: .public)") } continuation.finish() } @@ -373,6 +380,11 @@ extension NostrNetworkManager { actor TaskManager { private var tasks: [UUID: Task] = [:] + private static let logger = Logger( + subsystem: "com.jb55.damus", + category: "subscription_manager.task_manager" + ) + func add(task: Task) -> UUID { let taskId = UUID() self.tasks[taskId] = task @@ -387,11 +399,11 @@ extension NostrNetworkManager { } func cancelAllTasks() async { - await withTaskGroup { group in - Log.info("Cancelling all SubscriptionManager tasks", for: .subscription_manager) + await withTaskGroup { group in + Self.logger.info("Cancelling all SubscriptionManager tasks") // Start each task cancellation in parallel for faster execution for (taskId, _) in self.tasks { - Log.info("Cancelling SubscriptionManager task %s", for: .subscription_manager, taskId.uuidString) + Self.logger.info("Cancelling SubscriptionManager task \(taskId.uuidString, privacy: .public)") group.addTask { await self.cancelAndCleanUp(taskId: taskId) } @@ -400,7 +412,7 @@ extension NostrNetworkManager { for await value in group { continue } - Log.info("Cancelled all SubscriptionManager tasks", for: .subscription_manager) + Self.logger.info("Cancelled all SubscriptionManager tasks") } } }