Fix RelayPool connection race condition without time delays
This improves upon a temporary fix we had for the RelayPool race
condition that would cause timeline staleness.
The root cause was that during app launch, the HomeModel would subscribe
to some filters, and the subscribe function would filter out any relays
not yet connected to avoid unnecessary waiting for EOSEs from disconnected relays.
However, that filtering would cause the subscribe request to not be
queued up or sent back to the relays once connected, causing the relays
to never receive those subscription requests and causing timeline
staleness.
This was fixed by separating the relay list used for the subcription
request from the relay list used for waiting for network EOSEs. This
allows other mechanisms to ensure the subscription will go through even
when the app is initializing and relays are not yet fully connected.
Fixes: 61eb833239
Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
@@ -198,13 +198,6 @@ extension NostrNetworkManager {
|
||||
continue
|
||||
}
|
||||
|
||||
// FIXME: The delay below is to prevent race conditions when the RelayPool is initializing during the app start.
|
||||
// Without this, occasionally there is a race condition that causes the subscribe call to be missed somehow
|
||||
// despite mechanisms in place to queue up requests when relays are disconnected, as well as mechanisms to send subscribe requests when the relay is already connected.
|
||||
// This is difficult to fix as it will require a big refactor in `RelayPool` to implement proper async/await mechanisms, instead of the current "fire and forget" interfaces.
|
||||
// If this delay fixes the occasional timeline staleness when starting the app, it helps prove the hypothesis above.
|
||||
try await Task.sleep(nanoseconds: 2_000_000_000)
|
||||
|
||||
do {
|
||||
for await item in self.pool.subscribe(filters: filters, to: desiredRelays, id: id) {
|
||||
// NO-OP. Notes will be automatically ingested by NostrDB
|
||||
|
||||
@@ -259,7 +259,7 @@ class RelayPool {
|
||||
/// - Returns: Returns an async stream that callers can easily consume via a for-loop
|
||||
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, eoseTimeout: Duration? = nil, id: UUID? = nil) -> AsyncStream<StreamItem> {
|
||||
let eoseTimeout = eoseTimeout ?? .seconds(5)
|
||||
let desiredRelays = desiredRelays ?? self.relays.filter({ $0.connection.isConnected }).map({ $0.descriptor.url })
|
||||
let desiredRelays = desiredRelays ?? self.relays.map({ $0.descriptor.url })
|
||||
let startTime = CFAbsoluteTimeGetCurrent()
|
||||
return AsyncStream<StreamItem> { continuation in
|
||||
let id = id ?? UUID()
|
||||
@@ -284,8 +284,9 @@ class RelayPool {
|
||||
break // We do not support handling these yet
|
||||
case .eose(_):
|
||||
relaysWhoFinishedInitialResults.insert(relayUrl)
|
||||
Log.debug("RelayPool subscription %s: EOSE from %s. EOSE count: %d/%d. Elapsed: %.2f seconds.", for: .networking, id.uuidString, relayUrl.absoluteString, relaysWhoFinishedInitialResults.count, Set(desiredRelays).count, CFAbsoluteTimeGetCurrent() - startTime)
|
||||
if relaysWhoFinishedInitialResults == Set(desiredRelays) {
|
||||
let desiredAndConnectedRelays = desiredRelays ?? self.relays.filter({ $0.connection.isConnected }).map({ $0.descriptor.url })
|
||||
Log.debug("RelayPool subscription %s: EOSE from %s. EOSE count: %d/%d. Elapsed: %.2f seconds.", for: .networking, id.uuidString, relayUrl.absoluteString, relaysWhoFinishedInitialResults.count, Set(desiredAndConnectedRelays).count, CFAbsoluteTimeGetCurrent() - startTime)
|
||||
if relaysWhoFinishedInitialResults == Set(desiredAndConnectedRelays) {
|
||||
continuation.yield(with: .success(.eose))
|
||||
eoseSent = true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user