Move most of RelayPool away from the Main Thread

This is a large refactor that aims to improve performance by offloading
RelayPool computations into a separate actor outside the main thread.

This should reduce congestion on the main thread and thus improve UI
performance.

Also, the internal subscription callback mechanism was changed to use
AsyncStreams to prevent race conditions newly found in that area of the
code.

Changelog-Fixed: Added performance improvements to timeline scrolling
Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
Daniel D’Aquino
2025-10-10 14:12:30 -07:00
parent 7c1594107f
commit 991a4a86e6
50 changed files with 602 additions and 451 deletions

View File

@@ -48,13 +48,13 @@ final class RelayConnection: ObservableObject {
private lazy var socket = WebSocket(relay_url.url)
private var subscriptionToken: AnyCancellable?
private var handleEvent: (NostrConnectionEvent) -> ()
private var handleEvent: (NostrConnectionEvent) async -> ()
private var processEvent: (WebSocketEvent) -> ()
private let relay_url: RelayURL
var log: RelayLog?
init(url: RelayURL,
handleEvent: @escaping (NostrConnectionEvent) -> (),
handleEvent: @escaping (NostrConnectionEvent) async -> (),
processUnverifiedWSEvent: @escaping (WebSocketEvent) -> ())
{
self.relay_url = url
@@ -95,12 +95,12 @@ final class RelayConnection: ObservableObject {
.sink { [weak self] completion in
switch completion {
case .failure(let error):
self?.receive(event: .error(error))
Task { await self?.receive(event: .error(error)) }
case .finished:
self?.receive(event: .disconnected(.normalClosure, nil))
Task { await self?.receive(event: .disconnected(.normalClosure, nil)) }
}
} receiveValue: { [weak self] event in
self?.receive(event: event)
Task { await self?.receive(event: event) }
}
socket.connect()
@@ -138,7 +138,7 @@ final class RelayConnection: ObservableObject {
}
}
private func receive(event: WebSocketEvent) {
private func receive(event: WebSocketEvent) async {
assert(!Thread.isMainThread, "This code must not be executed on the main thread")
processEvent(event)
switch event {
@@ -149,7 +149,7 @@ final class RelayConnection: ObservableObject {
self.isConnecting = false
}
case .message(let message):
self.receive(message: message)
await self.receive(message: message)
case .disconnected(let closeCode, let reason):
if closeCode != .normalClosure {
Log.error("⚠️ Warning: RelayConnection (%d) closed with code: %s", for: .networking, String(describing: closeCode), String(describing: reason))
@@ -176,10 +176,8 @@ final class RelayConnection: ObservableObject {
self.reconnect_with_backoff()
}
}
DispatchQueue.main.async {
guard let ws_connection_event = NostrConnectionEvent.WSConnectionEvent.from(full_ws_event: event) else { return }
self.handleEvent(.ws_connection_event(ws_connection_event))
}
guard let ws_connection_event = NostrConnectionEvent.WSConnectionEvent.from(full_ws_event: event) else { return }
await self.handleEvent(.ws_connection_event(ws_connection_event))
if let description = event.description {
log?.add(description)
@@ -213,21 +211,19 @@ final class RelayConnection: ObservableObject {
}
}
private func receive(message: URLSessionWebSocketTask.Message) {
private func receive(message: URLSessionWebSocketTask.Message) async {
switch message {
case .string(let messageString):
// NOTE: Once we switch to the local relay model,
// we will not need to verify nostr events at this point.
if let ev = decode_and_verify_nostr_response(txt: messageString) {
DispatchQueue.main.async {
self.handleEvent(.nostr_event(ev))
}
await self.handleEvent(.nostr_event(ev))
return
}
print("failed to decode event \(messageString)")
case .data(let messageData):
if let messageString = String(data: messageData, encoding: .utf8) {
receive(message: .string(messageString))
await receive(message: .string(messageString))
}
@unknown default:
print("An unexpected URLSessionWebSocketTask.Message was received.")

View File

@@ -12,7 +12,7 @@ struct RelayHandler {
let sub_id: String
let filters: [NostrFilter]?
let to: [RelayURL]?
var callback: (RelayURL, NostrConnectionEvent) -> ()
var handler: AsyncStream<(RelayURL, NostrConnectionEvent)>.Continuation
}
struct QueuedRequest {
@@ -27,7 +27,8 @@ struct SeenEvent: Hashable {
}
/// Establishes and manages connections and subscriptions to a list of relays.
class RelayPool {
actor RelayPool {
@MainActor
private(set) var relays: [Relay] = []
var open: Bool = false
var handlers: [RelayHandler] = []
@@ -50,65 +51,86 @@ class RelayPool {
/// This is to avoid error states and undefined behaviour related to hitting subscription limits on the relays, by letting those wait instead with the principle that although slower is not ideal, it is better than completely broken.
static let MAX_CONCURRENT_SUBSCRIPTION_LIMIT = 14 // This number is only an educated guess based on some local experiments.
func close() {
disconnect()
relays = []
func close() async {
await disconnect()
await clearRelays()
open = false
handlers = []
request_queue = []
seen.removeAll()
await clearSeen()
counts = [:]
keypair = nil
}
@MainActor
private func clearRelays() {
relays = []
}
private func clearSeen() {
seen.removeAll()
}
init(ndb: Ndb, keypair: Keypair? = nil) {
self.ndb = ndb
self.keypair = keypair
network_monitor.pathUpdateHandler = { [weak self] path in
if (path.status == .satisfied || path.status == .requiresConnection) && self?.last_network_status != path.status {
DispatchQueue.main.async {
self?.connect_to_disconnected()
}
}
if let self, path.status != self.last_network_status {
for relay in self.relays {
relay.connection.log?.add("Network state: \(path.status)")
}
}
self?.last_network_status = path.status
Task { await self?.pathUpdateHandler(path: path) }
}
network_monitor.start(queue: network_monitor_queue)
}
private func pathUpdateHandler(path: NWPath) async {
if (path.status == .satisfied || path.status == .requiresConnection) && self.last_network_status != path.status {
await self.connect_to_disconnected()
}
if path.status != self.last_network_status {
for relay in await self.relays {
relay.connection.log?.add("Network state: \(path.status)")
}
}
self.last_network_status = path.status
}
@MainActor
var our_descriptors: [RelayDescriptor] {
return all_descriptors.filter { d in !d.ephemeral }
}
@MainActor
var all_descriptors: [RelayDescriptor] {
relays.map { r in r.descriptor }
}
@MainActor
var num_connected: Int {
return relays.reduce(0) { n, r in n + (r.connection.isConnected ? 1 : 0) }
}
func remove_handler(sub_id: String) {
self.handlers = handlers.filter { $0.sub_id != sub_id }
self.handlers = handlers.filter {
if $0.sub_id != sub_id {
return true
}
else {
$0.handler.finish()
return false
}
}
Log.debug("Removing %s handler, current: %d", for: .networking, sub_id, handlers.count)
}
func ping() {
Log.info("Pinging %d relays", for: .networking, relays.count)
for relay in relays {
func ping() async {
Log.info("Pinging %d relays", for: .networking, await relays.count)
for relay in await relays {
relay.connection.ping()
}
}
@MainActor
func register_handler(sub_id: String, filters: [NostrFilter]?, to relays: [RelayURL]? = nil, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) async {
func register_handler(sub_id: String, filters: [NostrFilter]?, to relays: [RelayURL]? = nil, handler: AsyncStream<(RelayURL, NostrConnectionEvent)>.Continuation) async {
while handlers.count > Self.MAX_CONCURRENT_SUBSCRIPTION_LIMIT {
Log.debug("%s: Too many subscriptions, waiting for subscription pool to clear", for: .networking, sub_id)
try? await Task.sleep(for: .seconds(1))
@@ -117,20 +139,22 @@ class RelayPool {
handlers = handlers.filter({ handler in
if handler.sub_id == sub_id {
Log.error("Duplicate handler detected for the same subscription ID. Overriding.", for: .networking)
handler.handler.finish()
return false
}
else {
return true
}
})
self.handlers.append(RelayHandler(sub_id: sub_id, filters: filters, to: relays, callback: handler))
self.handlers.append(RelayHandler(sub_id: sub_id, filters: filters, to: relays, handler: handler))
Log.debug("Registering %s handler, current: %d", for: .networking, sub_id, self.handlers.count)
}
func remove_relay(_ relay_id: RelayURL) {
@MainActor
func remove_relay(_ relay_id: RelayURL) async {
var i: Int = 0
self.disconnect(to: [relay_id])
await self.disconnect(to: [relay_id])
for relay in relays {
if relay.id == relay_id {
@@ -143,13 +167,13 @@ class RelayPool {
}
}
func add_relay(_ desc: RelayDescriptor) throws(RelayError) {
func add_relay(_ desc: RelayDescriptor) async throws(RelayError) {
let relay_id = desc.url
if get_relay(relay_id) != nil {
if await get_relay(relay_id) != nil {
throw RelayError.RelayAlreadyExists
}
let conn = RelayConnection(url: desc.url, handleEvent: { event in
self.handle_event(relay_id: relay_id, event: event)
await self.handle_event(relay_id: relay_id, event: event)
}, processUnverifiedWSEvent: { wsev in
guard case .message(let msg) = wsev,
case .string(let str) = msg
@@ -159,19 +183,24 @@ class RelayPool {
self.message_received_function?((str, desc))
})
let relay = Relay(descriptor: desc, connection: conn)
await self.appendRelayToList(relay: relay)
}
@MainActor
private func appendRelayToList(relay: Relay) {
self.relays.append(relay)
}
func setLog(_ log: RelayLog, for relay_id: RelayURL) {
func setLog(_ log: RelayLog, for relay_id: RelayURL) async {
// add the current network state to the log
log.add("Network state: \(network_monitor.currentPath.status)")
get_relay(relay_id)?.connection.log = log
await get_relay(relay_id)?.connection.log = log
}
/// This is used to retry dead connections
func connect_to_disconnected() {
for relay in relays {
func connect_to_disconnected() async {
for relay in await relays {
let c = relay.connection
let is_connecting = c.isConnecting
@@ -188,16 +217,16 @@ class RelayPool {
}
}
func reconnect(to: [RelayURL]? = nil) {
let relays = to.map{ get_relays($0) } ?? self.relays
func reconnect(to targetRelays: [RelayURL]? = nil) async {
let relays = await getRelays(targetRelays: targetRelays)
for relay in relays {
// don't try to reconnect to broken relays
relay.connection.reconnect()
}
}
func connect(to: [RelayURL]? = nil) {
let relays = to.map{ get_relays($0) } ?? self.relays
func connect(to targetRelays: [RelayURL]? = nil) async {
let relays = await getRelays(targetRelays: targetRelays)
for relay in relays {
relay.connection.connect()
}
@@ -205,15 +234,20 @@ class RelayPool {
open = true
}
func disconnect(to: [RelayURL]? = nil) {
func disconnect(to targetRelays: [RelayURL]? = nil) async {
// Mark as closed first, to prevent other classes from pulling data while the relays are being disconnected
open = false
let relays = to.map{ get_relays($0) } ?? self.relays
let relays = await getRelays(targetRelays: targetRelays)
for relay in relays {
relay.connection.disconnect()
}
}
@MainActor
func getRelays(targetRelays: [RelayURL]? = nil) -> [Relay] {
targetRelays.map{ get_relays($0) } ?? self.relays
}
/// Deletes queued up requests that should not persist between app sessions (i.e. when the app goes to background then back to foreground)
func cleanQueuedRequestForSessionEnd() {
request_queue = request_queue.filter { request in
@@ -231,14 +265,14 @@ class RelayPool {
}
}
func unsubscribe(sub_id: String, to: [RelayURL]? = nil) {
func unsubscribe(sub_id: String, to: [RelayURL]? = nil) async {
if to == nil {
self.remove_handler(sub_id: sub_id)
}
self.send(.unsubscribe(sub_id), to: to)
await self.send(.unsubscribe(sub_id), to: to)
}
func subscribe(sub_id: String, filters: [NostrFilter], handler: @escaping (RelayURL, NostrConnectionEvent) -> (), to: [RelayURL]? = nil) {
func subscribe(sub_id: String, filters: [NostrFilter], handler: AsyncStream<(RelayURL, NostrConnectionEvent)>.Continuation, to: [RelayURL]? = nil) {
Task {
await register_handler(sub_id: sub_id, filters: filters, to: to, handler: handler)
@@ -246,7 +280,7 @@ class RelayPool {
// When the caller specifies specific relays, do not skip ephemeral relays to respect the exact list given by the caller.
let shouldSkipEphemeralRelays = to == nil ? true : false
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to, skip_ephemeral: shouldSkipEphemeralRelays)
await send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to, skip_ephemeral: shouldSkipEphemeralRelays)
}
}
@@ -257,9 +291,9 @@ class RelayPool {
/// - desiredRelays: The desired relays which to subsctibe to. If `nil`, it defaults to the `RelayPool`'s default list
/// - eoseTimeout: The maximum timeout which to give up waiting for the eoseSignal
/// - Returns: Returns an async stream that callers can easily consume via a for-loop
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, eoseTimeout: Duration? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, eoseTimeout: Duration? = nil, id: UUID? = nil) async -> AsyncStream<StreamItem> {
let eoseTimeout = eoseTimeout ?? .seconds(5)
let desiredRelays = desiredRelays ?? self.relays.map({ $0.descriptor.url })
let desiredRelays = await getRelays(targetRelays: desiredRelays)
let startTime = CFAbsoluteTimeGetCurrent()
return AsyncStream<StreamItem> { continuation in
let id = id ?? UUID()
@@ -267,34 +301,40 @@ class RelayPool {
var seenEvents: Set<NoteId> = []
var relaysWhoFinishedInitialResults: Set<RelayURL> = []
var eoseSent = false
self.subscribe(sub_id: sub_id, filters: filters, handler: { (relayUrl, connectionEvent) in
switch connectionEvent {
case .ws_connection_event(let ev):
// Websocket events such as connect/disconnect/error are already handled in `RelayConnection`. Do not perform any handling here.
// For the future, perhaps we should abstract away `.ws_connection_event` in `RelayPool`? Seems like something to be handled on the `RelayConnection` layer.
break
case .nostr_event(let nostrResponse):
guard nostrResponse.subid == sub_id else { return } // Do not stream items that do not belong in this subscription
switch nostrResponse {
case .event(_, let nostrEvent):
if seenEvents.contains(nostrEvent.id) { break } // Don't send two of the same events.
continuation.yield(with: .success(.event(nostrEvent)))
seenEvents.insert(nostrEvent.id)
case .notice(let note):
break // We do not support handling these yet
case .eose(_):
relaysWhoFinishedInitialResults.insert(relayUrl)
let desiredAndConnectedRelays = desiredRelays ?? self.relays.filter({ $0.connection.isConnected }).map({ $0.descriptor.url })
Log.debug("RelayPool subscription %s: EOSE from %s. EOSE count: %d/%d. Elapsed: %.2f seconds.", for: .networking, id.uuidString, relayUrl.absoluteString, relaysWhoFinishedInitialResults.count, Set(desiredAndConnectedRelays).count, CFAbsoluteTimeGetCurrent() - startTime)
if relaysWhoFinishedInitialResults == Set(desiredAndConnectedRelays) {
continuation.yield(with: .success(.eose))
eoseSent = true
let upstreamStream = AsyncStream<(RelayURL, NostrConnectionEvent)> { upstreamContinuation in
self.subscribe(sub_id: sub_id, filters: filters, handler: upstreamContinuation, to: desiredRelays.map({ $0.descriptor.url }))
}
let upstreamStreamingTask = Task {
for await (relayUrl, connectionEvent) in upstreamStream {
try Task.checkCancellation()
switch connectionEvent {
case .ws_connection_event(let ev):
// Websocket events such as connect/disconnect/error are already handled in `RelayConnection`. Do not perform any handling here.
// For the future, perhaps we should abstract away `.ws_connection_event` in `RelayPool`? Seems like something to be handled on the `RelayConnection` layer.
break
case .nostr_event(let nostrResponse):
guard nostrResponse.subid == sub_id else { return } // Do not stream items that do not belong in this subscription
switch nostrResponse {
case .event(_, let nostrEvent):
if seenEvents.contains(nostrEvent.id) { break } // Don't send two of the same events.
continuation.yield(with: .success(.event(nostrEvent)))
seenEvents.insert(nostrEvent.id)
case .notice(let note):
break // We do not support handling these yet
case .eose(_):
relaysWhoFinishedInitialResults.insert(relayUrl)
let desiredAndConnectedRelays = desiredRelays.filter({ $0.connection.isConnected }).map({ $0.descriptor.url })
Log.debug("RelayPool subscription %s: EOSE from %s. EOSE count: %d/%d. Elapsed: %.2f seconds.", for: .networking, id.uuidString, relayUrl.absoluteString, relaysWhoFinishedInitialResults.count, Set(desiredAndConnectedRelays).count, CFAbsoluteTimeGetCurrent() - startTime)
if relaysWhoFinishedInitialResults == Set(desiredAndConnectedRelays) {
continuation.yield(with: .success(.eose))
eoseSent = true
}
case .ok(_): break // No need to handle this, we are not sending an event to the relay
case .auth(_): break // Handled in a separate function in RelayPool
}
case .ok(_): break // No need to handle this, we are not sending an event to the relay
case .auth(_): break // Handled in a separate function in RelayPool
}
}
}, to: desiredRelays)
}
let timeoutTask = Task {
try? await Task.sleep(for: eoseTimeout)
if !eoseSent { continuation.yield(with: .success(.eose)) }
@@ -308,9 +348,12 @@ class RelayPool {
@unknown default:
break
}
self.unsubscribe(sub_id: sub_id, to: desiredRelays)
self.remove_handler(sub_id: sub_id)
Task {
await self.unsubscribe(sub_id: sub_id, to: desiredRelays.map({ $0.descriptor.url }))
await self.remove_handler(sub_id: sub_id)
}
timeoutTask.cancel()
upstreamStreamingTask.cancel()
}
}
}
@@ -322,11 +365,11 @@ class RelayPool {
case eose
}
func subscribe_to(sub_id: String, filters: [NostrFilter], to: [RelayURL]?, handler: @escaping (RelayURL, NostrConnectionEvent) -> ()) {
func subscribe_to(sub_id: String, filters: [NostrFilter], to: [RelayURL]?, handler: AsyncStream<(RelayURL, NostrConnectionEvent)>.Continuation) {
Task {
await register_handler(sub_id: sub_id, filters: filters, to: to, handler: handler)
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to)
await send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to)
}
}
@@ -341,7 +384,6 @@ class RelayPool {
return c
}
@MainActor
func queue_req(r: NostrRequestType, relay: RelayURL, skip_ephemeral: Bool) {
let count = count_queued(relay: relay)
guard count <= 10 else {
@@ -365,8 +407,8 @@ class RelayPool {
}
}
func send_raw(_ req: NostrRequestType, to: [RelayURL]? = nil, skip_ephemeral: Bool = true) {
let relays = to.map{ get_relays($0) } ?? self.relays
func send_raw(_ req: NostrRequestType, to: [RelayURL]? = nil, skip_ephemeral: Bool = true) async {
let relays = await getRelays(targetRelays: to)
self.send_raw_to_local_ndb(req) // Always send Nostr events and data to NostrDB for a local copy
@@ -394,15 +436,17 @@ class RelayPool {
}
}
func send(_ req: NostrRequest, to: [RelayURL]? = nil, skip_ephemeral: Bool = true) {
send_raw(.typical(req), to: to, skip_ephemeral: skip_ephemeral)
func send(_ req: NostrRequest, to: [RelayURL]? = nil, skip_ephemeral: Bool = true) async {
await send_raw(.typical(req), to: to, skip_ephemeral: skip_ephemeral)
}
@MainActor
func get_relays(_ ids: [RelayURL]) -> [Relay] {
// don't include ephemeral relays in the default list to query
relays.filter { ids.contains($0.id) }
}
@MainActor
func get_relay(_ id: RelayURL) -> Relay? {
relays.first(where: { $0.id == id })
}
@@ -415,7 +459,7 @@ class RelayPool {
}
print("running queueing request: \(req.req) for \(relay_id)")
self.send_raw(req.req, to: [relay_id], skip_ephemeral: false)
Task { await self.send_raw(req.req, to: [relay_id], skip_ephemeral: false) }
}
}
@@ -432,7 +476,7 @@ class RelayPool {
}
}
func resubscribeAll(relayId: RelayURL) {
func resubscribeAll(relayId: RelayURL) async {
for handler in self.handlers {
guard let filters = handler.filters else { continue }
// When the caller specifies no relays, it is implied that the user wants to use the ones in the user relay list. Skip ephemeral relays in that case.
@@ -446,11 +490,11 @@ class RelayPool {
}
Log.debug("%s: Sending resubscribe request to %s", for: .networking, handler.sub_id, relayId.absoluteString)
send(.subscribe(.init(filters: filters, sub_id: handler.sub_id)), to: [relayId], skip_ephemeral: shouldSkipEphemeralRelays)
await send(.subscribe(.init(filters: filters, sub_id: handler.sub_id)), to: [relayId], skip_ephemeral: shouldSkipEphemeralRelays)
}
}
func handle_event(relay_id: RelayURL, event: NostrConnectionEvent) {
func handle_event(relay_id: RelayURL, event: NostrConnectionEvent) async {
record_seen(relay_id: relay_id, event: event)
// When we reconnect, do two things
@@ -459,20 +503,20 @@ class RelayPool {
if case .ws_connection_event(let ws) = event {
if case .connected = ws {
run_queue(relay_id)
self.resubscribeAll(relayId: relay_id)
await self.resubscribeAll(relayId: relay_id)
}
}
// Handle auth
if case let .nostr_event(nostrResponse) = event,
case let .auth(challenge_string) = nostrResponse {
if let relay = get_relay(relay_id) {
if let relay = await get_relay(relay_id) {
print("received auth request from \(relay.descriptor.url.id)")
relay.authentication_state = .pending
if let keypair {
if let fullKeypair = keypair.to_full() {
if let authRequest = make_auth_request(keypair: fullKeypair, challenge_string: challenge_string, relay: relay) {
send(.auth(authRequest), to: [relay_id], skip_ephemeral: false)
await send(.auth(authRequest), to: [relay_id], skip_ephemeral: false)
relay.authentication_state = .verified
} else {
print("failed to make auth request")
@@ -491,13 +535,13 @@ class RelayPool {
}
for handler in handlers {
handler.callback(relay_id, event)
handler.handler.yield((relay_id, event))
}
}
}
func add_rw_relay(_ pool: RelayPool, _ url: RelayURL) {
try? pool.add_relay(RelayPool.RelayDescriptor(url: url, info: .readWrite))
func add_rw_relay(_ pool: RelayPool, _ url: RelayURL) async {
try? await pool.add_relay(RelayPool.RelayDescriptor(url: url, info: .readWrite))
}