Revert "Reduce battery usage by using exp backoff on connections"
This is causing pretty bad fail to reconnect issues This reverts commit252a77fd97, reversing changes made toa611a5d252.
This commit is contained in:
@@ -31,9 +31,9 @@ class EventsModel: ObservableObject {
|
||||
}
|
||||
|
||||
func subscribe() {
|
||||
state.pool.subscribe_to(sub_id: sub_id,
|
||||
filters: [get_filter()],
|
||||
handler: handle_nostr_event)
|
||||
state.pool.subscribe(sub_id: sub_id,
|
||||
filters: [get_filter()],
|
||||
handler: handle_nostr_event)
|
||||
}
|
||||
|
||||
func unsubscribe() {
|
||||
|
||||
@@ -40,7 +40,7 @@ class FollowersModel: ObservableObject {
|
||||
let filter = get_filter()
|
||||
let filters = [filter]
|
||||
print_filters(relay_id: "following", filters: [filters])
|
||||
self.damus_state.pool.subscribe_to(sub_id: sub_id, filters: filters, handler: handle_event)
|
||||
self.damus_state.pool.subscribe(sub_id: sub_id, filters: filters, handler: handle_event)
|
||||
}
|
||||
|
||||
func unsubscribe() {
|
||||
|
||||
@@ -41,7 +41,7 @@ class FollowingModel {
|
||||
}
|
||||
let filters = [filter]
|
||||
print_filters(relay_id: "following", filters: [filters])
|
||||
self.damus_state.pool.subscribe_to(sub_id: sub_id, filters: filters, handler: handle_event)
|
||||
self.damus_state.pool.subscribe(sub_id: sub_id, filters: filters, handler: handle_event)
|
||||
}
|
||||
|
||||
func unsubscribe() {
|
||||
|
||||
@@ -83,8 +83,8 @@ class ProfileModel: ObservableObject, Equatable {
|
||||
|
||||
print("subscribing to profile \(pubkey) with sub_id \(sub_id)")
|
||||
print_filters(relay_id: "profile", filters: [[text_filter], [profile_filter]])
|
||||
damus.pool.subscribe_to(sub_id: sub_id, filters: [text_filter], handler: handle_event)
|
||||
damus.pool.subscribe_to(sub_id: prof_subid, filters: [profile_filter], handler: handle_event)
|
||||
damus.pool.subscribe(sub_id: sub_id, filters: [text_filter], handler: handle_event)
|
||||
damus.pool.subscribe(sub_id: prof_subid, filters: [profile_filter], handler: handle_event)
|
||||
}
|
||||
|
||||
func handle_profile_contact_event(_ ev: NostrEvent) {
|
||||
|
||||
@@ -38,7 +38,7 @@ class SearchHomeModel: ObservableObject {
|
||||
func subscribe() {
|
||||
loading = true
|
||||
let to_relays = determine_to_relays(pool: damus_state.pool, filters: damus_state.relay_filters)
|
||||
damus_state.pool.subscribe_to(sub_id: base_subid, filters: [get_base_filter()], to: to_relays, handler: handle_event)
|
||||
damus_state.pool.subscribe(sub_id: base_subid, filters: [get_base_filter()], handler: handle_event, to: to_relays)
|
||||
}
|
||||
|
||||
func unsubscribe(to: String? = nil) {
|
||||
|
||||
@@ -104,8 +104,8 @@ class ThreadModel: ObservableObject {
|
||||
|
||||
print("subscribing to thread \(event.id) with sub_id \(base_subid)")
|
||||
loading = true
|
||||
damus_state.pool.subscribe_to(sub_id: base_subid, filters: base_filters, handler: handle_event)
|
||||
damus_state.pool.subscribe_to(sub_id: meta_subid, filters: meta_filters, handler: handle_event)
|
||||
damus_state.pool.subscribe(sub_id: base_subid, filters: base_filters, handler: handle_event)
|
||||
damus_state.pool.subscribe(sub_id: meta_subid, filters: meta_filters, handler: handle_event)
|
||||
}
|
||||
|
||||
func add_event(_ ev: NostrEvent, privkey: String?) {
|
||||
|
||||
@@ -29,7 +29,7 @@ class ZapsModel: ObservableObject {
|
||||
case .note(let note_target):
|
||||
filter.referenced_ids = [note_target.note_id]
|
||||
}
|
||||
state.pool.subscribe_to(sub_id: zaps_subid, filters: [filter], handler: handle_event)
|
||||
state.pool.subscribe(sub_id: zaps_subid, filters: [filter], handler: handle_event)
|
||||
}
|
||||
|
||||
func unsubscribe() {
|
||||
|
||||
@@ -14,15 +14,9 @@ enum NostrConnectionEvent {
|
||||
}
|
||||
|
||||
final class RelayConnection: WebSocketDelegate {
|
||||
enum State {
|
||||
case notConnected
|
||||
case connecting
|
||||
case reconnecting
|
||||
case connected
|
||||
case failed
|
||||
}
|
||||
|
||||
private(set) var state: State = .notConnected
|
||||
private(set) var isConnected = false
|
||||
private(set) var isConnecting = false
|
||||
private(set) var isReconnecting = false
|
||||
|
||||
private(set) var last_connection_attempt: TimeInterval = 0
|
||||
private lazy var socket = {
|
||||
@@ -31,36 +25,38 @@ final class RelayConnection: WebSocketDelegate {
|
||||
socket.delegate = self
|
||||
return socket
|
||||
}()
|
||||
private let eventHandler: (NostrConnectionEvent) -> ()
|
||||
let url: URL
|
||||
|
||||
init(url: URL, eventHandler: @escaping (NostrConnectionEvent) -> ()) {
|
||||
private var handleEvent: (NostrConnectionEvent) -> ()
|
||||
private let url: URL
|
||||
|
||||
init(url: URL, handleEvent: @escaping (NostrConnectionEvent) -> ()) {
|
||||
self.url = url
|
||||
self.eventHandler = eventHandler
|
||||
self.handleEvent = handleEvent
|
||||
}
|
||||
|
||||
func reconnect() {
|
||||
if state == .connected {
|
||||
state = .reconnecting
|
||||
if isConnected {
|
||||
isReconnecting = true
|
||||
disconnect()
|
||||
} else {
|
||||
// we're already disconnected, so just connect
|
||||
connect()
|
||||
connect(force: true)
|
||||
}
|
||||
}
|
||||
|
||||
func connect(force: Bool = false) {
|
||||
if !force && (state == .connected || state == .connecting) {
|
||||
if !force && (isConnected || isConnecting) {
|
||||
return
|
||||
}
|
||||
|
||||
state = .connecting
|
||||
isConnecting = true
|
||||
last_connection_attempt = Date().timeIntervalSince1970
|
||||
socket.connect()
|
||||
}
|
||||
|
||||
func disconnect() {
|
||||
socket.disconnect()
|
||||
isConnected = false
|
||||
isConnecting = false
|
||||
}
|
||||
|
||||
func send(_ req: NostrRequest) {
|
||||
@@ -72,52 +68,51 @@ final class RelayConnection: WebSocketDelegate {
|
||||
socket.write(string: req)
|
||||
}
|
||||
|
||||
private func decodeEvent(_ txt: String) throws -> NostrConnectionEvent {
|
||||
if let ev = decode_nostr_event(txt: txt) {
|
||||
return .nostr_event(ev)
|
||||
} else {
|
||||
throw DecodingError.dataCorrupted(.init(codingPath: [], debugDescription: "decoding event failed"))
|
||||
}
|
||||
}
|
||||
|
||||
@MainActor
|
||||
private func handleEvent(_ event: NostrConnectionEvent) async {
|
||||
eventHandler(event)
|
||||
}
|
||||
|
||||
// MARK: - WebSocketDelegate
|
||||
|
||||
func didReceive(event: WebSocketEvent, client: WebSocket) {
|
||||
switch event {
|
||||
case .connected:
|
||||
state = .connected
|
||||
self.isConnected = true
|
||||
self.isConnecting = false
|
||||
|
||||
case .disconnected:
|
||||
if state == .reconnecting {
|
||||
connect()
|
||||
} else {
|
||||
state = .notConnected
|
||||
self.isConnecting = false
|
||||
self.isConnected = false
|
||||
if self.isReconnecting {
|
||||
self.isReconnecting = false
|
||||
self.connect()
|
||||
}
|
||||
|
||||
case .cancelled, .error:
|
||||
state = .failed
|
||||
self.isConnecting = false
|
||||
self.isConnected = false
|
||||
|
||||
case .text(let txt):
|
||||
Task(priority: .userInitiated) {
|
||||
do {
|
||||
let event = try decodeEvent(txt)
|
||||
await handleEvent(event)
|
||||
} catch {
|
||||
print("decode failed for \(txt): \(error)")
|
||||
// TODO: trigger event error
|
||||
if txt.count > 2000 {
|
||||
DispatchQueue.global(qos: .default).async {
|
||||
if let ev = decode_nostr_event(txt: txt) {
|
||||
DispatchQueue.main.async {
|
||||
self.handleEvent(.nostr_event(ev))
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if let ev = decode_nostr_event(txt: txt) {
|
||||
handleEvent(.nostr_event(ev))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
print("decode failed for \(txt)")
|
||||
// TODO: trigger event error
|
||||
|
||||
default:
|
||||
break
|
||||
}
|
||||
|
||||
eventHandler(.ws_event(event))
|
||||
handleEvent(.ws_event(event))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,22 @@
|
||||
|
||||
import Foundation
|
||||
|
||||
struct SubscriptionId: Identifiable, CustomStringConvertible {
|
||||
let id: String
|
||||
|
||||
var description: String {
|
||||
id
|
||||
}
|
||||
}
|
||||
|
||||
struct RelayId: Identifiable, CustomStringConvertible {
|
||||
let id: String
|
||||
|
||||
var description: String {
|
||||
id
|
||||
}
|
||||
}
|
||||
|
||||
struct RelayHandler {
|
||||
let sub_id: String
|
||||
let callback: (String, NostrConnectionEvent) -> ()
|
||||
@@ -17,58 +33,58 @@ struct QueuedRequest {
|
||||
let relay: String
|
||||
}
|
||||
|
||||
final class RelayPool {
|
||||
enum Constants {
|
||||
/// Used for an exponential backoff algorithm when retrying stale connections
|
||||
/// Each retry attempt will be delayed by raising this base delay to an exponent
|
||||
/// equal to the number of previous retries.
|
||||
static let base_reconnect_delay: TimeInterval = 2
|
||||
static let max_queued_requests = 10
|
||||
static let max_retry_attempts = 3
|
||||
}
|
||||
|
||||
private(set) var relays: [Relay] = []
|
||||
private(set) var handlers: [RelayHandler] = []
|
||||
private var request_queue: [QueuedRequest] = []
|
||||
private(set) var seen: Set<String> = Set()
|
||||
private(set) var counts: [String: UInt64] = [:]
|
||||
private var retry_attempts_per_relay: [URL: Int] = [:]
|
||||
struct NostrRequestId: Equatable, Hashable {
|
||||
let relay: String?
|
||||
let sub_id: String
|
||||
}
|
||||
|
||||
class RelayPool {
|
||||
var relays: [Relay] = []
|
||||
var handlers: [RelayHandler] = []
|
||||
var request_queue: [QueuedRequest] = []
|
||||
var seen: Set<String> = Set()
|
||||
var counts: [String: UInt64] = [:]
|
||||
|
||||
var descriptors: [RelayDescriptor] {
|
||||
relays.map { $0.descriptor }
|
||||
}
|
||||
|
||||
var num_connecting: Int {
|
||||
relays.reduce(0) { n, r in n + (r.connection.state == .connecting ? 1 : 0) }
|
||||
return relays.reduce(0) { n, r in n + (r.connection.isConnecting ? 1 : 0) }
|
||||
}
|
||||
|
||||
func remove_handler(sub_id: String) {
|
||||
guard let index = handlers.firstIndex(where: { $0.sub_id == sub_id }) else {
|
||||
return
|
||||
}
|
||||
handlers.remove(at: index)
|
||||
self.handlers = handlers.filter { $0.sub_id != sub_id }
|
||||
print("removing \(sub_id) handler, current: \(handlers.count)")
|
||||
}
|
||||
|
||||
func register_handler(sub_id: String, handler: @escaping (String, NostrConnectionEvent) -> ()) {
|
||||
guard !handlers.contains(where: { $0.sub_id == sub_id }) else {
|
||||
return // don't add duplicate handlers
|
||||
for handler in handlers {
|
||||
// don't add duplicate handlers
|
||||
if handler.sub_id == sub_id {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
handlers.append(RelayHandler(sub_id: sub_id, callback: handler))
|
||||
self.handlers.append(RelayHandler(sub_id: sub_id, callback: handler))
|
||||
print("registering \(sub_id) handler, current: \(self.handlers.count)")
|
||||
}
|
||||
|
||||
func remove_relay(_ relay_id: String) {
|
||||
disconnect(from: [relay_id])
|
||||
var i: Int = 0
|
||||
|
||||
if let index = relays.firstIndex(where: { $0.id == relay_id }) {
|
||||
relays.remove(at: index)
|
||||
self.disconnect(to: [relay_id])
|
||||
|
||||
for relay in relays {
|
||||
if relay.id == relay_id {
|
||||
relays.remove(at: i)
|
||||
break
|
||||
}
|
||||
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
|
||||
@discardableResult
|
||||
func add_relay(_ url: URL, info: RelayInfo) throws -> Relay {
|
||||
func add_relay(_ url: URL, info: RelayInfo) throws {
|
||||
let relay_id = get_relay_id(url)
|
||||
if get_relay(relay_id) != nil {
|
||||
throw RelayError.RelayAlreadyExists
|
||||
@@ -78,57 +94,40 @@ final class RelayPool {
|
||||
}
|
||||
let descriptor = RelayDescriptor(url: url, info: info)
|
||||
let relay = Relay(descriptor: descriptor, connection: conn)
|
||||
relays.append(relay)
|
||||
return relay
|
||||
self.relays.append(relay)
|
||||
}
|
||||
|
||||
/// This is used to retry dead connections
|
||||
func connect_to_disconnected() {
|
||||
for relay in relays where !relay.is_broken && relay.connection.state != .connected {
|
||||
for relay in relays {
|
||||
let c = relay.connection
|
||||
|
||||
let is_connecting = c.state == .reconnecting || c.state == .connecting
|
||||
let is_connecting = c.isReconnecting || c.isConnecting
|
||||
|
||||
let retry_attempts = retry_attempts_per_relay[c.url] ?? 0
|
||||
|
||||
let delay = pow(Constants.base_reconnect_delay, TimeInterval(retry_attempts + 1)) // the + 1 helps us avoid a 1-second delay for the first retry
|
||||
if is_connecting && (Date.now.timeIntervalSince1970 - c.last_connection_attempt) > delay {
|
||||
if retry_attempts > Constants.max_retry_attempts {
|
||||
if c.state != .notConnected {
|
||||
c.disconnect()
|
||||
print("exceeded max connection attempts with \(relay.descriptor.url.absoluteString)")
|
||||
relay.mark_broken()
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
print("stale connection detected (\(relay.descriptor.url.absoluteString)). retrying after \(delay) seconds...")
|
||||
c.connect(force: true)
|
||||
retry_attempts_per_relay[c.url] = retry_attempts + 1
|
||||
}
|
||||
} else if is_connecting {
|
||||
if is_connecting && (Date.now.timeIntervalSince1970 - c.last_connection_attempt) > 5 {
|
||||
print("stale connection detected (\(relay.descriptor.url.absoluteString)). retrying...")
|
||||
relay.connection.connect(force: true)
|
||||
} else if relay.is_broken || is_connecting || c.isConnected {
|
||||
continue
|
||||
} else {
|
||||
c.reconnect()
|
||||
relay.connection.reconnect()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func reconnect(to relay_ids: [String]? = nil) {
|
||||
let relays: [Relay]
|
||||
if let relay_ids {
|
||||
relays = get_relays(relay_ids)
|
||||
} else {
|
||||
relays = self.relays
|
||||
}
|
||||
|
||||
for relay in relays where !relay.is_broken {
|
||||
func reconnect(to: [String]? = nil) {
|
||||
let relays = to.map{ get_relays($0) } ?? self.relays
|
||||
for relay in relays {
|
||||
// don't try to reconnect to broken relays
|
||||
relay.connection.reconnect()
|
||||
}
|
||||
}
|
||||
|
||||
func mark_broken(_ relay_id: String) {
|
||||
relays.first(where: { $0.id == relay_id })?.mark_broken()
|
||||
for relay in relays {
|
||||
relay.mark_broken()
|
||||
}
|
||||
}
|
||||
|
||||
func connect(to: [String]? = nil) {
|
||||
@@ -138,8 +137,8 @@ final class RelayPool {
|
||||
}
|
||||
}
|
||||
|
||||
private func disconnect(from: [String]? = nil) {
|
||||
let relays = from.map{ get_relays($0) } ?? self.relays
|
||||
func disconnect(to: [String]? = nil) {
|
||||
let relays = to.map{ get_relays($0) } ?? self.relays
|
||||
for relay in relays {
|
||||
relay.connection.disconnect()
|
||||
}
|
||||
@@ -147,23 +146,35 @@ final class RelayPool {
|
||||
|
||||
func unsubscribe(sub_id: String, to: [String]? = nil) {
|
||||
if to == nil {
|
||||
remove_handler(sub_id: sub_id)
|
||||
self.remove_handler(sub_id: sub_id)
|
||||
}
|
||||
send(.unsubscribe(sub_id), to: to)
|
||||
self.send(.unsubscribe(sub_id), to: to)
|
||||
}
|
||||
|
||||
func subscribe_to(sub_id: String, filters: [NostrFilter], to: [String]? = nil, handler: @escaping (String, NostrConnectionEvent) -> ()) {
|
||||
func subscribe(sub_id: String, filters: [NostrFilter], handler: @escaping (String, NostrConnectionEvent) -> (), to: [String]? = nil) {
|
||||
register_handler(sub_id: sub_id, handler: handler)
|
||||
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to)
|
||||
}
|
||||
|
||||
func subscribe_to(sub_id: String, filters: [NostrFilter], to: [String]?, handler: @escaping (String, NostrConnectionEvent) -> ()) {
|
||||
register_handler(sub_id: sub_id, handler: handler)
|
||||
send(.subscribe(.init(filters: filters, sub_id: sub_id)), to: to)
|
||||
}
|
||||
|
||||
func count_queued(relay: String) -> Int {
|
||||
request_queue.filter({ $0.relay == relay }).count
|
||||
var c = 0
|
||||
for request in request_queue {
|
||||
if request.relay == relay {
|
||||
c += 1
|
||||
}
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func queue_req(r: NostrRequest, relay: String) {
|
||||
let count = count_queued(relay: relay)
|
||||
guard count < Constants.max_queued_requests else {
|
||||
guard count <= 10 else {
|
||||
print("can't queue, too many queued events for \(relay)")
|
||||
return
|
||||
}
|
||||
@@ -173,10 +184,10 @@ final class RelayPool {
|
||||
}
|
||||
|
||||
func send(_ req: NostrRequest, to: [String]? = nil) {
|
||||
let relays = to.map { get_relays($0) } ?? self.relays
|
||||
|
||||
let relays = to.map{ get_relays($0) } ?? self.relays
|
||||
|
||||
for relay in relays {
|
||||
guard relay.connection.state == .connected else {
|
||||
guard relay.connection.isConnected else {
|
||||
queue_req(r: req, relay: relay.id)
|
||||
continue
|
||||
}
|
||||
@@ -196,14 +207,17 @@ final class RelayPool {
|
||||
func record_last_pong(relay_id: String, event: NostrConnectionEvent) {
|
||||
if case .ws_event(let ws_event) = event {
|
||||
if case .pong = ws_event {
|
||||
if let relay = relays.first(where: { $0.id == relay_id }) {
|
||||
relay.last_pong = UInt32(Date.now.timeIntervalSince1970)
|
||||
for relay in relays {
|
||||
if relay.id == relay_id {
|
||||
relay.last_pong = UInt32(Date.now.timeIntervalSince1970)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func run_queue(_ relay_id: String) {
|
||||
func run_queue(_ relay_id: String) {
|
||||
self.request_queue = request_queue.reduce(into: Array<QueuedRequest>()) { (q, req) in
|
||||
guard req.relay == relay_id else {
|
||||
q.append(req)
|
||||
@@ -221,14 +235,17 @@ final class RelayPool {
|
||||
let k = relay_id + nev.id
|
||||
if !seen.contains(k) {
|
||||
seen.insert(k)
|
||||
let prev_count = counts[relay_id] ?? 0
|
||||
counts[relay_id] = prev_count + 1
|
||||
if counts[relay_id] == nil {
|
||||
counts[relay_id] = 1
|
||||
} else {
|
||||
counts[relay_id] = (counts[relay_id] ?? 0) + 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func handle_event(relay_id: String, event: NostrConnectionEvent) {
|
||||
func handle_event(relay_id: String, event: NostrConnectionEvent) {
|
||||
record_last_pong(relay_id: relay_id, event: event)
|
||||
record_seen(relay_id: relay_id, event: event)
|
||||
|
||||
@@ -248,5 +265,7 @@ final class RelayPool {
|
||||
|
||||
func add_rw_relay(_ pool: RelayPool, _ url: String) {
|
||||
let url_ = URL(string: url)!
|
||||
let _ = try? pool.add_relay(url_, info: RelayInfo.rw)
|
||||
try? pool.add_relay(url_, info: RelayInfo.rw)
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -7,16 +7,6 @@
|
||||
|
||||
import SwiftUI
|
||||
|
||||
extension RelayConnection.State {
|
||||
var indicatorColor: Color {
|
||||
switch self {
|
||||
case .connected: return .green
|
||||
case .connecting, .reconnecting: return .yellow
|
||||
default: return .red
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct RelayStatus: View {
|
||||
let pool: RelayPool
|
||||
let relay: String
|
||||
@@ -26,10 +16,18 @@ struct RelayStatus: View {
|
||||
@State var conn_color: Color = .gray
|
||||
|
||||
func update_connection_color() {
|
||||
guard let relay = pool.relays.first(where: { $0.id == relay }) else {
|
||||
return
|
||||
for relay in pool.relays {
|
||||
if relay.id == self.relay {
|
||||
let c = relay.connection
|
||||
if c.isConnected {
|
||||
conn_color = .green
|
||||
} else if c.isConnecting || c.isReconnecting {
|
||||
conn_color = .yellow
|
||||
} else {
|
||||
conn_color = .red
|
||||
}
|
||||
}
|
||||
}
|
||||
conn_color = relay.connection.state.indicatorColor
|
||||
}
|
||||
|
||||
var body: some View {
|
||||
|
||||
Reference in New Issue
Block a user