Replace Starscream with URLSessionWebSocketTask
Changelog-Fixed: Fix slow reconnection issues
This commit is contained in:
committed by
William Casarin
parent
6ac68b5a73
commit
0e94c48e26
@@ -52,14 +52,12 @@ class Relay: Identifiable {
|
||||
let descriptor: RelayDescriptor
|
||||
let connection: RelayConnection
|
||||
|
||||
var last_pong: UInt32
|
||||
var flags: Int
|
||||
|
||||
init(descriptor: RelayDescriptor, connection: RelayConnection) {
|
||||
self.flags = 0
|
||||
self.descriptor = descriptor
|
||||
self.connection = connection
|
||||
self.last_pong = 0
|
||||
}
|
||||
|
||||
func mark_broken() {
|
||||
|
||||
@@ -5,26 +5,22 @@
|
||||
// Created by William Casarin on 2022-04-02.
|
||||
//
|
||||
|
||||
import Combine
|
||||
import Foundation
|
||||
import Starscream
|
||||
|
||||
enum NostrConnectionEvent {
|
||||
case ws_event(WebSocketEvent)
|
||||
case nostr_event(NostrResponse)
|
||||
}
|
||||
|
||||
final class RelayConnection: WebSocketDelegate {
|
||||
final class RelayConnection {
|
||||
private(set) var isConnected = false
|
||||
private(set) var isConnecting = false
|
||||
private(set) var isReconnecting = false
|
||||
|
||||
private(set) var last_connection_attempt: TimeInterval = 0
|
||||
private lazy var socket = {
|
||||
let req = URLRequest(url: url)
|
||||
let socket = WebSocket(request: req, compressionHandler: .none)
|
||||
socket.delegate = self
|
||||
return socket
|
||||
}()
|
||||
private lazy var socket = WebSocket(url)
|
||||
private var subscriptionToken: AnyCancellable?
|
||||
|
||||
private var handleEvent: (NostrConnectionEvent) -> ()
|
||||
private let url: URL
|
||||
|
||||
@@ -33,16 +29,6 @@ final class RelayConnection: WebSocketDelegate {
|
||||
self.handleEvent = handleEvent
|
||||
}
|
||||
|
||||
func reconnect() {
|
||||
if isConnected {
|
||||
isReconnecting = true
|
||||
disconnect()
|
||||
} else {
|
||||
// we're already disconnected, so just connect
|
||||
connect(force: true)
|
||||
}
|
||||
}
|
||||
|
||||
func connect(force: Bool = false) {
|
||||
if !force && (isConnected || isConnecting) {
|
||||
return
|
||||
@@ -50,11 +36,27 @@ final class RelayConnection: WebSocketDelegate {
|
||||
|
||||
isConnecting = true
|
||||
last_connection_attempt = Date().timeIntervalSince1970
|
||||
|
||||
subscriptionToken = socket.subject
|
||||
.receive(on: DispatchQueue.main)
|
||||
.sink { [weak self] completion in
|
||||
switch completion {
|
||||
case .failure(let error):
|
||||
self?.receive(event: .error(error))
|
||||
case .finished:
|
||||
self?.receive(event: .disconnected(.normalClosure, nil))
|
||||
}
|
||||
} receiveValue: { [weak self] event in
|
||||
self?.receive(event: event)
|
||||
}
|
||||
|
||||
socket.connect()
|
||||
}
|
||||
|
||||
func disconnect() {
|
||||
socket.disconnect()
|
||||
subscriptionToken = nil
|
||||
|
||||
isConnected = false
|
||||
isConnecting = false
|
||||
}
|
||||
@@ -64,34 +66,46 @@ final class RelayConnection: WebSocketDelegate {
|
||||
print("failed to encode nostr req: \(req)")
|
||||
return
|
||||
}
|
||||
|
||||
socket.write(string: req)
|
||||
socket.send(.string(req))
|
||||
}
|
||||
|
||||
// MARK: - WebSocketDelegate
|
||||
|
||||
func didReceive(event: WebSocketEvent, client: WebSocket) {
|
||||
private func receive(event: WebSocketEvent) {
|
||||
switch event {
|
||||
case .connected:
|
||||
self.isConnected = true
|
||||
self.isConnecting = false
|
||||
|
||||
case .disconnected:
|
||||
self.isConnecting = false
|
||||
self.isConnected = false
|
||||
if self.isReconnecting {
|
||||
self.isReconnecting = false
|
||||
self.connect()
|
||||
case .message(let message):
|
||||
self.receive(message: message)
|
||||
case .disconnected(let closeCode, let reason):
|
||||
if closeCode != .normalClosure {
|
||||
print("⚠️ Warning: RelayConnection (\(self.url)) closed with code \(closeCode), reason: \(String(describing: reason))")
|
||||
}
|
||||
|
||||
case .cancelled, .error:
|
||||
self.isConnecting = false
|
||||
self.isConnected = false
|
||||
|
||||
case .text(let txt):
|
||||
if txt.utf8.count > 2000 {
|
||||
isConnected = false
|
||||
isConnecting = false
|
||||
reconnect()
|
||||
case .error(let error):
|
||||
print("⚠️ Warning: RelayConnection (\(self.url)) error: \(error)")
|
||||
isConnected = false
|
||||
isConnecting = false
|
||||
reconnect()
|
||||
}
|
||||
self.handleEvent(.ws_event(event))
|
||||
}
|
||||
|
||||
func reconnect() {
|
||||
guard !isConnecting else {
|
||||
return // we're already trying to connect
|
||||
}
|
||||
disconnect()
|
||||
connect()
|
||||
}
|
||||
|
||||
private func receive(message: URLSessionWebSocketTask.Message) {
|
||||
switch message {
|
||||
case .string(let messageString):
|
||||
if messageString.utf8.count > 2000 {
|
||||
DispatchQueue.global(qos: .default).async {
|
||||
if let ev = decode_nostr_event(txt: txt) {
|
||||
if let ev = decode_nostr_event(txt: messageString) {
|
||||
DispatchQueue.main.async {
|
||||
self.handleEvent(.nostr_event(ev))
|
||||
}
|
||||
@@ -99,18 +113,18 @@ final class RelayConnection: WebSocketDelegate {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if let ev = decode_nostr_event(txt: txt) {
|
||||
if let ev = decode_nostr_event(txt: messageString) {
|
||||
handleEvent(.nostr_event(ev))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
default:
|
||||
break
|
||||
case .data(let messageData):
|
||||
if let messageString = String(data: messageData, encoding: .utf8) {
|
||||
receive(message: .string(messageString))
|
||||
}
|
||||
@unknown default:
|
||||
print("An unexpected URLSessionWebSocketTask.Message was received.")
|
||||
}
|
||||
|
||||
handleEvent(.ws_event(event))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
//
|
||||
|
||||
import Foundation
|
||||
import Network
|
||||
|
||||
struct SubscriptionId: Identifiable, CustomStringConvertible {
|
||||
let id: String
|
||||
@@ -44,7 +45,24 @@ class RelayPool {
|
||||
var request_queue: [QueuedRequest] = []
|
||||
var seen: Set<String> = Set()
|
||||
var counts: [String: UInt64] = [:]
|
||||
|
||||
private let network_monitor = NWPathMonitor()
|
||||
private let network_monitor_queue = DispatchQueue(label: "io.damus.network_monitor")
|
||||
private var last_network_status: NWPath.Status = .unsatisfied
|
||||
|
||||
init() {
|
||||
network_monitor.pathUpdateHandler = { [weak self] path in
|
||||
if (path.status == .satisfied || path.status == .requiresConnection) && self?.last_network_status != path.status {
|
||||
DispatchQueue.main.async {
|
||||
self?.connect_to_disconnected()
|
||||
}
|
||||
}
|
||||
|
||||
self?.last_network_status = path.status
|
||||
}
|
||||
network_monitor.start(queue: network_monitor_queue)
|
||||
}
|
||||
|
||||
var descriptors: [RelayDescriptor] {
|
||||
relays.map { $0.descriptor }
|
||||
}
|
||||
@@ -106,11 +124,11 @@ class RelayPool {
|
||||
for relay in relays {
|
||||
let c = relay.connection
|
||||
|
||||
let is_connecting = c.isReconnecting || c.isConnecting
|
||||
let is_connecting = c.isConnecting
|
||||
|
||||
if is_connecting && (Date.now.timeIntervalSince1970 - c.last_connection_attempt) > 5 {
|
||||
print("stale connection detected (\(relay.descriptor.url.absoluteString)). retrying...")
|
||||
relay.connection.connect(force: true)
|
||||
relay.connection.reconnect()
|
||||
} else if relay.is_broken || is_connecting || c.isConnected {
|
||||
continue
|
||||
} else {
|
||||
@@ -208,19 +226,6 @@ class RelayPool {
|
||||
relays.first(where: { $0.id == id })
|
||||
}
|
||||
|
||||
func record_last_pong(relay_id: String, event: NostrConnectionEvent) {
|
||||
if case .ws_event(let ws_event) = event {
|
||||
if case .pong = ws_event {
|
||||
for relay in relays {
|
||||
if relay.id == relay_id {
|
||||
relay.last_pong = UInt32(Date.now.timeIntervalSince1970)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func run_queue(_ relay_id: String) {
|
||||
self.request_queue = request_queue.reduce(into: Array<QueuedRequest>()) { (q, req) in
|
||||
guard req.relay == relay_id else {
|
||||
@@ -250,7 +255,6 @@ class RelayPool {
|
||||
}
|
||||
|
||||
func handle_event(relay_id: String, event: NostrConnectionEvent) {
|
||||
record_last_pong(relay_id: relay_id, event: event)
|
||||
record_seen(relay_id: relay_id, event: event)
|
||||
|
||||
// run req queue when we reconnect
|
||||
|
||||
87
damus/Nostr/WebSocket.swift
Normal file
87
damus/Nostr/WebSocket.swift
Normal file
@@ -0,0 +1,87 @@
|
||||
//
|
||||
// WebSocket.swift
|
||||
// damus
|
||||
//
|
||||
// Created by Bryan Montz on 4/13/23.
|
||||
//
|
||||
|
||||
import Combine
|
||||
import Foundation
|
||||
|
||||
enum WebSocketEvent {
|
||||
case connected
|
||||
case message(URLSessionWebSocketTask.Message)
|
||||
case disconnected(URLSessionWebSocketTask.CloseCode, String?)
|
||||
case error(Error)
|
||||
}
|
||||
|
||||
final class WebSocket: NSObject, URLSessionWebSocketDelegate {
|
||||
|
||||
private let url: URL
|
||||
private let session: URLSession
|
||||
private lazy var webSocketTask: URLSessionWebSocketTask = {
|
||||
let task = session.webSocketTask(with: url)
|
||||
task.delegate = self
|
||||
return task
|
||||
}()
|
||||
|
||||
let subject = PassthroughSubject<WebSocketEvent, Never>()
|
||||
|
||||
init(_ url: URL, session: URLSession = .shared) {
|
||||
self.url = url
|
||||
self.session = session
|
||||
}
|
||||
|
||||
func connect() {
|
||||
resume()
|
||||
}
|
||||
|
||||
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode = .normalClosure, reason: Data? = nil) {
|
||||
webSocketTask.cancel(with: closeCode, reason: reason)
|
||||
|
||||
// reset after disconnecting to be ready for reconnecting
|
||||
let task = session.webSocketTask(with: url)
|
||||
task.delegate = self
|
||||
webSocketTask = task
|
||||
|
||||
let reason_str: String?
|
||||
if let reason {
|
||||
reason_str = String(data: reason, encoding: .utf8)
|
||||
} else {
|
||||
reason_str = nil
|
||||
}
|
||||
subject.send(.disconnected(closeCode, reason_str))
|
||||
}
|
||||
|
||||
func send(_ message: URLSessionWebSocketTask.Message) {
|
||||
webSocketTask.send(message) { [weak self] error in
|
||||
if let error {
|
||||
self?.subject.send(.error(error))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func resume() {
|
||||
webSocketTask.receive { [weak self] result in
|
||||
switch result {
|
||||
case .success(let message):
|
||||
self?.subject.send(.message(message))
|
||||
self?.resume()
|
||||
case .failure(let error):
|
||||
self?.subject.send(.error(error))
|
||||
}
|
||||
}
|
||||
|
||||
webSocketTask.resume()
|
||||
}
|
||||
|
||||
// MARK: - URLSessionWebSocketDelegate
|
||||
|
||||
func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol theProtocol: String?) {
|
||||
subject.send(.connected)
|
||||
}
|
||||
|
||||
func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
|
||||
disconnect(closeCode: closeCode, reason: reason)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user