Consume NIP-19 relay hints for event fetching
Extract and use relay hints from bech32 entities (nevent, nprofile, naddr) and event tag references (e, q tags) to fetch events from hinted relays not in the user's relay pool. Changes: - Parse relay hints from bech32 TLV data in URLHandler - Pass relay hints through SearchType and NoteReference enums - Add ensureConnected() to RelayPool for ephemeral relay connections - Implement ephemeral relay lease management with race condition protection - Add repostTarget() helper to extract relay hints from repost e tags - Add QuoteRef struct to preserve relay hints from q tags (NIP-10/NIP-18) - Support relay hints in replies with author pubkey in e-tags (NIP-10) - Implement fallback broadcast when hinted relays don't respond - Add comprehensive test coverage for relay hint functionality - Add DEBUG logging for relay hint tracing during development Implementation details: - Connect to hinted relays as ephemeral, returning early when first connects - Use total deadline to prevent timeout accumulation across hint attempts - Decrement lease count before suspension points to ensure atomicity - Fall back to broadcast if hints don't resolve or respond Closes: https://github.com/damus-io/damus/issues/1147 Changelog-Added: Added relay hint support for nevent, nprofile, naddr links and event tag references (reposts, quotes, replies) Signed-off-by: alltheseas Signed-off-by: Daniel D'Aquino <daniel@daquino.me> Co-authored-by: alltheseas <alltheseas@users.noreply.github.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: Daniel D'Aquino <daniel@daquino.me
This commit is contained in:
@@ -409,25 +409,89 @@ extension NostrNetworkManager {
|
||||
|
||||
// MARK: - Finding specific data from Nostr
|
||||
|
||||
/// Finds a non-replaceable event based on a note ID
|
||||
/// Finds a non-replaceable event based on a note ID.
|
||||
///
|
||||
/// When relay hints are provided, they get a short exclusive window to respond.
|
||||
/// If no event is found within that window, the remaining time is used to broadcast
|
||||
/// to all connected relays. The `timeout` parameter is a total deadline for both phases.
|
||||
func lookup(noteId: NoteId, to targetRelays: [RelayURL]? = nil, timeout: Duration? = nil) async throws -> NdbNoteLender? {
|
||||
let filter = NostrFilter(ids: [noteId], limit: 1)
|
||||
|
||||
// Since note ids point to immutable objects, we can do a simple ndb lookup first
|
||||
if let noteKey = try? self.ndb.lookup_note_key(noteId) {
|
||||
return NdbNoteLender(ndb: self.ndb, noteKey: noteKey)
|
||||
}
|
||||
|
||||
|
||||
// Not available in local ndb, stream from network
|
||||
outerLoop: for await item in await self.pool.subscribe(filters: [NostrFilter(ids: [noteId], limit: 1)], to: targetRelays, eoseTimeout: timeout) {
|
||||
let filter = NostrFilter(ids: [noteId], limit: 1)
|
||||
let totalTimeout = timeout ?? .seconds(10)
|
||||
let startTime = ContinuousClock.now
|
||||
|
||||
// If relay hints provided, try them first with a short timeout
|
||||
if let targetRelays, !targetRelays.isEmpty {
|
||||
// Acquire ephemeral relays and connect to them
|
||||
await self.pool.acquireEphemeralRelays(targetRelays)
|
||||
defer {
|
||||
Task { await self.pool.releaseEphemeralRelays(targetRelays) }
|
||||
}
|
||||
|
||||
let connectedRelays = await self.pool.ensureConnected(to: targetRelays)
|
||||
guard !connectedRelays.isEmpty else {
|
||||
#if DEBUG
|
||||
Self.logger.info("lookup(noteId): No hint relays connected, skipping to broadcast")
|
||||
#endif
|
||||
return await fetchFromRelays(filter: filter, relays: nil, timeout: totalTimeout)
|
||||
}
|
||||
|
||||
// Use min of 3 seconds or half of total timeout for hint phase
|
||||
let hintTimeout = min(.seconds(3), totalTimeout / 2)
|
||||
|
||||
#if DEBUG
|
||||
Self.logger.info("lookup(noteId): Trying \(connectedRelays.count)/\(targetRelays.count) hint relay(s) with \(hintTimeout) timeout")
|
||||
#endif
|
||||
|
||||
let result = await fetchFromRelays(filter: filter, relays: connectedRelays, timeout: hintTimeout)
|
||||
if let result {
|
||||
return result
|
||||
}
|
||||
|
||||
// Calculate remaining time for broadcast phase
|
||||
let elapsed = ContinuousClock.now - startTime
|
||||
let remaining = totalTimeout - elapsed
|
||||
|
||||
guard remaining > .zero else {
|
||||
#if DEBUG
|
||||
Self.logger.info("lookup(noteId): Total timeout exceeded, skipping broadcast")
|
||||
#endif
|
||||
return nil
|
||||
}
|
||||
|
||||
// Hint relays didn't respond, fallback to broadcast with remaining time
|
||||
#if DEBUG
|
||||
Self.logger.info("lookup(noteId): Hint relays didn't respond, falling back to broadcast (\(remaining) remaining)")
|
||||
#endif
|
||||
return await fetchFromRelays(filter: filter, relays: nil, timeout: remaining)
|
||||
}
|
||||
|
||||
// No hints, broadcast to all relays
|
||||
return await fetchFromRelays(filter: filter, relays: nil, timeout: totalTimeout)
|
||||
}
|
||||
|
||||
/// Fetches the first event matching the filter from the specified relays.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - filter: The NostrFilter to match events against.
|
||||
/// - relays: Optional relay URLs to query. If nil, broadcasts to all connected relays.
|
||||
/// - timeout: Maximum duration to wait for a response.
|
||||
/// - Returns: An `NdbNoteLender` for the first matching event, or `nil` if EOSE is received
|
||||
/// or the timeout expires without finding a match.
|
||||
private func fetchFromRelays(filter: NostrFilter, relays: [RelayURL]?, timeout: Duration) async -> NdbNoteLender? {
|
||||
for await item in await self.pool.subscribe(filters: [filter], to: relays, eoseTimeout: timeout) {
|
||||
switch item {
|
||||
case .event(let event):
|
||||
return NdbNoteLender(ownedNdbNote: event)
|
||||
case .eose:
|
||||
break outerLoop
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -439,32 +503,53 @@ extension NostrNetworkManager {
|
||||
return events
|
||||
}
|
||||
|
||||
/// Finds a replaceable event based on an `naddr` address.
|
||||
///
|
||||
/// Finds a Nostr event that corresponds to the provided naddr identifier.
|
||||
/// - Parameters:
|
||||
/// - naddr: the `naddr` address
|
||||
/// - naddr: The NAddr (network address) that identifies the target replaceable event (contains kind, author, and identifier).
|
||||
/// - targetRelays: Optional relay URLs to hint where to search; the method may acquire ephemeral relays and will use only the subset of those that become connected.
|
||||
/// - timeout: Optional duration to bound the search.
|
||||
/// - Returns: The matching `NostrEvent` whose first referenced parameter equals `naddr.identifier`, or `nil` if no matching event is found.
|
||||
func lookup(naddr: NAddr, to targetRelays: [RelayURL]? = nil, timeout: Duration? = nil) async -> NostrEvent? {
|
||||
var nostrKinds: [NostrKind]? = NostrKind(rawValue: naddr.kind).map { [$0] }
|
||||
var connectedTargetRelays = targetRelays
|
||||
var ephemeralRelays: [RelayURL] = []
|
||||
if let relays = targetRelays, !relays.isEmpty {
|
||||
await self.pool.acquireEphemeralRelays(relays)
|
||||
ephemeralRelays = relays
|
||||
let connectedRelays = await self.pool.ensureConnected(to: relays)
|
||||
connectedTargetRelays = connectedRelays.isEmpty ? nil : connectedRelays
|
||||
#if DEBUG
|
||||
Self.logger.info("lookup(naddr): Using \(connectedRelays.count)/\(relays.count) relay hints: \(connectedRelays.map { $0.absoluteString }.joined(separator: ", "), privacy: .public)")
|
||||
#endif
|
||||
}
|
||||
|
||||
defer {
|
||||
if !ephemeralRelays.isEmpty {
|
||||
Task { await self.pool.releaseEphemeralRelays(ephemeralRelays) }
|
||||
}
|
||||
}
|
||||
|
||||
let nostrKinds: [NostrKind]? = NostrKind(rawValue: naddr.kind).map { [$0] }
|
||||
let filter = NostrFilter(kinds: nostrKinds, authors: [naddr.author])
|
||||
|
||||
for await noteLender in self.streamExistingEvents(filters: [filter], to: targetRelays, timeout: timeout) {
|
||||
// TODO: This can be refactored to borrow the note instead of copying it. But we need to implement `referenced_params` on `UnownedNdbNote` to do so
|
||||
|
||||
for await noteLender in self.streamExistingEvents(filters: [filter], to: connectedTargetRelays, timeout: timeout) {
|
||||
guard let event = noteLender.justGetACopy() else { continue }
|
||||
if event.referenced_params.first?.param.string() == naddr.identifier {
|
||||
return event
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: Improve this. This is mostly intact to keep compatibility with its predecessor, but we can do better
|
||||
/// Searches for a profile or event specified by `query` and returns the first matching result.
|
||||
/// The function first checks the local NDB cache and, if not found, queries relays (honoring any relay hints in the query).
|
||||
/// - Parameter query: Specifies what to find (profile by pubkey or event by id) and optional relay hints to use for network lookup.
|
||||
/// - Returns: A `FoundEvent` containing the matched profile or event, or `nil` if no match is found.
|
||||
func findEvent(query: FindEvent) async -> FoundEvent? {
|
||||
var filter: NostrFilter? = nil
|
||||
let find_from = query.find_from
|
||||
let query = query.type
|
||||
|
||||
|
||||
switch query {
|
||||
case .profile(let pubkey):
|
||||
let profileNotNil = try? self.ndb.lookup_profile(pubkey, borrow: { pr in
|
||||
@@ -483,12 +568,28 @@ extension NostrNetworkManager {
|
||||
}
|
||||
filter = NostrFilter(ids: [evid], limit: 1)
|
||||
}
|
||||
|
||||
var attempts: Int = 0
|
||||
var has_event = false
|
||||
|
||||
guard let filter else { return nil }
|
||||
|
||||
for await noteLender in self.streamExistingEvents(filters: [filter], to: find_from) {
|
||||
|
||||
var targetRelays = find_from
|
||||
var ephemeralRelays: [RelayURL] = []
|
||||
if let relays = find_from, !relays.isEmpty {
|
||||
await self.pool.acquireEphemeralRelays(relays)
|
||||
ephemeralRelays = relays
|
||||
let connectedRelays = await self.pool.ensureConnected(to: relays)
|
||||
targetRelays = connectedRelays.isEmpty ? nil : connectedRelays
|
||||
#if DEBUG
|
||||
Self.logger.info("findEvent: Using \(connectedRelays.count)/\(relays.count) relay hints: \(connectedRelays.map { $0.absoluteString }.joined(separator: ", "), privacy: .public)")
|
||||
#endif
|
||||
}
|
||||
|
||||
defer {
|
||||
if !ephemeralRelays.isEmpty {
|
||||
Task { await self.pool.releaseEphemeralRelays(ephemeralRelays) }
|
||||
}
|
||||
}
|
||||
|
||||
for await noteLender in self.streamExistingEvents(filters: [filter], to: targetRelays) {
|
||||
let foundEvent: FoundEvent? = try? noteLender.borrow({ event in
|
||||
switch query {
|
||||
case .profile:
|
||||
@@ -659,4 +760,4 @@ extension NostrNetworkManager {
|
||||
/// Preload metadata for authors and referenced profiles
|
||||
case preload
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -49,22 +49,22 @@ protocol TagItemConvertible {
|
||||
|
||||
struct QuoteId: IdType, TagKey, TagConvertible {
|
||||
let id: Data
|
||||
|
||||
|
||||
init(_ data: Data) {
|
||||
self.id = data
|
||||
}
|
||||
|
||||
|
||||
/// The note id being quoted
|
||||
var note_id: NoteId {
|
||||
NoteId(self.id)
|
||||
}
|
||||
|
||||
var keychar: AsciiCharacter { "q" }
|
||||
|
||||
|
||||
var tag: [String] {
|
||||
["q", self.hex()]
|
||||
}
|
||||
|
||||
|
||||
static func from_tag(tag: TagSequence) -> QuoteId? {
|
||||
var i = tag.makeIterator()
|
||||
|
||||
@@ -80,6 +80,52 @@ struct QuoteId: IdType, TagKey, TagConvertible {
|
||||
}
|
||||
}
|
||||
|
||||
/// A quote reference with optional relay hints for fetching.
|
||||
///
|
||||
/// Per NIP-10/NIP-18, `q` tags include a relay URL at position 2 where the quoted
|
||||
/// event can be found.
|
||||
///
|
||||
/// Note: The NIPs allow `q` tags to contain either event IDs (hex) or event addresses
|
||||
/// (`<kind>:<pubkey>:<d>` for replaceable events). This implementation currently only
|
||||
/// supports hex event IDs; quotes of addressable events are not yet handled.
|
||||
struct QuoteRef: TagConvertible {
|
||||
let quote_id: QuoteId
|
||||
let relayHints: [RelayURL]
|
||||
|
||||
/// The note ID being quoted
|
||||
var note_id: NoteId {
|
||||
quote_id.note_id
|
||||
}
|
||||
|
||||
var tag: [String] {
|
||||
var tagBuilder = ["q", quote_id.hex()]
|
||||
if let relay = relayHints.first {
|
||||
tagBuilder.append(relay.absoluteString)
|
||||
}
|
||||
return tagBuilder
|
||||
}
|
||||
|
||||
/// Parses a `q` tag into a QuoteRef, preserving relay hints from position 2.
|
||||
///
|
||||
/// Only parses `q` tags containing hex event IDs. Tags with event addresses
|
||||
/// (`<kind>:<pubkey>:<d>`) are not currently supported and will return nil.
|
||||
static func from_tag(tag: TagSequence) -> QuoteRef? {
|
||||
var i = tag.makeIterator()
|
||||
|
||||
guard tag.count >= 2,
|
||||
let t0 = i.next(),
|
||||
let key = t0.single_char,
|
||||
key == "q",
|
||||
let t1 = i.next(),
|
||||
let data = t1.id()
|
||||
else { return nil }
|
||||
|
||||
let quoteId = QuoteId(data)
|
||||
let relayHints = tag.relayHints
|
||||
return QuoteRef(quote_id: quoteId, relayHints: relayHints)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
struct Privkey: IdType {
|
||||
let id: Data
|
||||
|
||||
@@ -122,6 +122,11 @@ struct MentionRef: TagKeys, TagConvertible, Equatable, Hashable {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a tag sequence into a MentionRef, preserving relay hints.
|
||||
///
|
||||
/// Per NIP-01/NIP-10, position 2 in `e`, `p`, and `a` tags contains an optional relay URL.
|
||||
/// When present, this method creates `nevent`/`nprofile`/`naddr` variants that preserve
|
||||
/// the relay hint for later use in event fetching.
|
||||
static func from_tag(tag: TagSequence) -> MentionRef? {
|
||||
guard tag.count >= 2 else { return nil }
|
||||
|
||||
@@ -135,23 +140,35 @@ struct MentionRef: TagKeys, TagConvertible, Equatable, Hashable {
|
||||
return nil
|
||||
}
|
||||
|
||||
let relayHints = tag.relayHints
|
||||
|
||||
switch mention_type {
|
||||
case .p:
|
||||
guard let data = element.id() else { return nil }
|
||||
return .init(nip19: .npub(Pubkey(data)))
|
||||
let pubkey = Pubkey(data)
|
||||
if relayHints.isEmpty {
|
||||
return .init(nip19: .npub(pubkey))
|
||||
}
|
||||
return .init(nip19: .nprofile(NProfile(author: pubkey, relays: relayHints)))
|
||||
case .e:
|
||||
guard let data = element.id() else { return nil }
|
||||
return .init(nip19: .note(NoteId(data)))
|
||||
let noteId = NoteId(data)
|
||||
if relayHints.isEmpty {
|
||||
return .init(nip19: .note(noteId))
|
||||
}
|
||||
#if DEBUG
|
||||
print("[relay-hints] e tag: Found \(relayHints.count) hint(s) for \(noteId.hex().prefix(8))...: \(relayHints.map { $0.absoluteString })")
|
||||
#endif
|
||||
return .init(nip19: .nevent(NEvent(noteid: noteId, relays: relayHints)))
|
||||
case .a:
|
||||
let str = element.string()
|
||||
let data = str.split(separator: ":")
|
||||
if(data.count != 3) { return nil }
|
||||
|
||||
guard data.count == 3 else { return nil }
|
||||
guard let pubkey = Pubkey(hex: String(data[1])) else { return nil }
|
||||
guard let kind = UInt32(data[0]) else { return nil }
|
||||
|
||||
return .init(nip19: .naddr(NAddr(identifier: String(data[2]), author: pubkey, relays: [], kind: kind)))
|
||||
case .r: return .init(nip19: .nrelay(element.string()))
|
||||
return .init(nip19: .naddr(NAddr(identifier: String(data[2]), author: pubkey, relays: relayHints, kind: kind)))
|
||||
case .r:
|
||||
return .init(nip19: .nrelay(element.string()))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -834,6 +834,68 @@ func first_eref_mention(ndb: Ndb, ev: NostrEvent, keypair: Keypair) -> Mention<N
|
||||
})
|
||||
}
|
||||
|
||||
/// Represents a note mention with optional relay hints for fetching.
|
||||
struct NoteMentionWithHints {
|
||||
let noteId: NoteId
|
||||
let relayHints: [RelayURL]
|
||||
let index: Int?
|
||||
}
|
||||
|
||||
/// Finds the first event reference mention in a note's content, preserving relay hints.
|
||||
///
|
||||
/// Per NIP-19, `nevent` bech32 entities may include relay hints. This function extracts
|
||||
/// those hints so they can be used when fetching the referenced event.
|
||||
///
|
||||
/// If no inline mention is found in the content, falls back to checking `q` tags (NIP-10/NIP-18)
|
||||
/// to support quote reposts that don't embed the quoted note inline.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - ndb: The nostrdb instance.
|
||||
/// - ev: The event to search.
|
||||
/// - keypair: The keypair for decryption if needed.
|
||||
/// - Returns: A `NoteMentionWithHints` containing the note ID and relay hints, or nil if not found.
|
||||
func first_eref_mention_with_hints(ndb: Ndb, ev: NostrEvent, keypair: Keypair) -> NoteMentionWithHints? {
|
||||
// First check content blocks for inline mentions
|
||||
let inlineMention: NoteMentionWithHints? = try? NdbBlockGroup.borrowBlockGroup(event: ev, using: ndb, and: keypair, borrow: { blockGroup in
|
||||
return blockGroup.forEachBlock({ index, block in
|
||||
switch block {
|
||||
case .mention(let mention):
|
||||
guard let mentionRef = MentionRef(block: mention) else { return .loopContinue }
|
||||
switch mentionRef.nip19 {
|
||||
case .note(let noteId):
|
||||
return .loopReturn(NoteMentionWithHints(noteId: noteId, relayHints: [], index: index))
|
||||
case .nevent(let nEvent):
|
||||
#if DEBUG
|
||||
if !nEvent.relays.isEmpty {
|
||||
print("[relay-hints] Inline nevent: Found \(nEvent.relays.count) hint(s) for \(nEvent.noteid.hex().prefix(8))...: \(nEvent.relays.map { $0.absoluteString })")
|
||||
}
|
||||
#endif
|
||||
return .loopReturn(NoteMentionWithHints(noteId: nEvent.noteid, relayHints: nEvent.relays, index: index))
|
||||
default:
|
||||
return .loopContinue
|
||||
}
|
||||
default:
|
||||
return .loopContinue
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
if let inlineMention {
|
||||
return inlineMention
|
||||
}
|
||||
|
||||
// Fall back to q tags (NIP-10/NIP-18 quote reposts)
|
||||
guard let quoteRef = ev.referenced_quote_refs.first else {
|
||||
return nil
|
||||
}
|
||||
#if DEBUG
|
||||
if !quoteRef.relayHints.isEmpty {
|
||||
print("[relay-hints] Quote: Found q tag with \(quoteRef.relayHints.count) hint(s) for \(quoteRef.note_id.hex().prefix(8))...: \(quoteRef.relayHints.map { $0.absoluteString })")
|
||||
}
|
||||
#endif
|
||||
return NoteMentionWithHints(noteId: quoteRef.note_id, relayHints: quoteRef.relayHints, index: nil)
|
||||
}
|
||||
|
||||
func separate_invoices(ndb: Ndb, ev: NostrEvent, keypair: Keypair) -> [Invoice]? {
|
||||
return try? NdbBlockGroup.borrowBlockGroup(event: ev, using: ndb, and: keypair, borrow: { blockGroup in
|
||||
let invoiceBlocks: [Invoice] = (try? blockGroup.reduce(initialResult: [Invoice](), { index, invoices, block in
|
||||
|
||||
@@ -52,6 +52,11 @@ class RelayPool {
|
||||
var delegate: Delegate?
|
||||
private(set) var signal: SignalModel = SignalModel()
|
||||
|
||||
/// Tracks active leases on ephemeral relays to prevent premature cleanup.
|
||||
/// Each lookup that uses an ephemeral relay acquires a lease; cleanup only
|
||||
/// happens when the last lease is released.
|
||||
private var ephemeralLeases: [RelayURL: Int] = [:]
|
||||
|
||||
let network_monitor = NWPathMonitor()
|
||||
private let network_monitor_queue = DispatchQueue(label: "io.damus.network_monitor")
|
||||
private var last_network_status: NWPath.Status = .unsatisfied
|
||||
@@ -159,23 +164,77 @@ class RelayPool {
|
||||
Log.debug("Registering %s handler, current: %d", for: .networking, sub_id, self.handlers.count)
|
||||
}
|
||||
|
||||
/// Removes the relay with the given URL from the pool, permanently disables its connection, and ensures it is disconnected.
|
||||
/// - Parameters:
|
||||
/// - relay_id: The RelayURL identifying the relay to disable and remove.
|
||||
@MainActor
|
||||
func remove_relay(_ relay_id: RelayURL) async {
|
||||
var i: Int = 0
|
||||
|
||||
await self.disconnect(to: [relay_id])
|
||||
|
||||
|
||||
for relay in relays {
|
||||
if relay.id == relay_id {
|
||||
relay.connection.disablePermanently()
|
||||
relays.remove(at: i)
|
||||
break
|
||||
}
|
||||
|
||||
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
|
||||
/// Acquires a lease on ephemeral relays to prevent them from being cleaned up
|
||||
/// Increment lease counts for the given ephemeral relay URLs to prevent their removal while leased.
|
||||
/// - Parameters:
|
||||
/// - relayURLs: The relay URLs whose ephemeral lease counts will be incremented; each URL's lease count is increased by one.
|
||||
func acquireEphemeralRelays(_ relayURLs: [RelayURL]) {
|
||||
for url in relayURLs {
|
||||
ephemeralLeases[url, default: 0] += 1
|
||||
#if DEBUG
|
||||
print("[RelayPool] Acquired lease on ephemeral relay \(url.absoluteString), count: \(ephemeralLeases[url] ?? 0)")
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
/// Releases leases on ephemeral relays. When the last lease is released,
|
||||
/// Releases one lease for each specified relay and removes any ephemeral relay when its last lease is released.
|
||||
/// - Parameters:
|
||||
/// - relayURLs: Relay URLs whose leases should be decremented. If a relay's lease count reaches zero and the relay is marked ephemeral, the relay will be removed. Relays not present in the lease table are ignored.
|
||||
func releaseEphemeralRelays(_ relayURLs: [RelayURL]) async {
|
||||
for url in relayURLs {
|
||||
guard let count = ephemeralLeases[url], count > 0 else { continue }
|
||||
|
||||
// Decrement immediately (atomic with respect to this actor, before any suspension)
|
||||
let newCount = count - 1
|
||||
ephemeralLeases[url] = newCount == 0 ? nil : newCount
|
||||
|
||||
#if DEBUG
|
||||
print("[RelayPool] Released lease on ephemeral relay \(url.absoluteString), count: \(newCount)")
|
||||
#endif
|
||||
|
||||
if newCount == 0 {
|
||||
// Check if relay exists and is ephemeral
|
||||
if let relay = await get_relay(url), relay.descriptor.ephemeral {
|
||||
// Re-check: only remove if lease is still nil (not re-acquired during await)
|
||||
guard ephemeralLeases[url] == nil else {
|
||||
#if DEBUG
|
||||
print("[RelayPool] Lease re-acquired during check, skipping removal: \(url.absoluteString)")
|
||||
#endif
|
||||
continue
|
||||
}
|
||||
#if DEBUG
|
||||
print("[RelayPool] Removing ephemeral relay: \(url.absoluteString)")
|
||||
#endif
|
||||
await remove_relay(url)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds and registers a new relay in the pool using the provided descriptor.
|
||||
/// - Parameter desc: Descriptor for the relay to add (includes its URL, metadata, and whether it is ephemeral).
|
||||
/// - Throws: `RelayError.RelayAlreadyExists` if a relay with the same URL is already present in the pool.
|
||||
func add_relay(_ desc: RelayDescriptor) async throws(RelayError) {
|
||||
let relay_id = desc.url
|
||||
if await get_relay(relay_id) != nil {
|
||||
@@ -188,6 +247,16 @@ class RelayPool {
|
||||
case .string(let str) = msg
|
||||
else { return }
|
||||
|
||||
#if DEBUG
|
||||
if desc.ephemeral {
|
||||
if str.hasPrefix("[\"EVENT\"") {
|
||||
print("[RelayPool] Received EVENT from ephemeral relay \(relay_id.absoluteString): \(str.prefix(200))...")
|
||||
} else if str.hasPrefix("[\"EOSE\"") {
|
||||
print("[RelayPool] Received EOSE from ephemeral relay \(relay_id.absoluteString)")
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
let _ = self.ndb?.processEvent(str, originRelayURL: relay_id)
|
||||
self.message_received_function?((str, desc))
|
||||
})
|
||||
@@ -195,11 +264,116 @@ class RelayPool {
|
||||
await self.appendRelayToList(relay: relay)
|
||||
}
|
||||
|
||||
/// Appends the given Relay to the pool's internal list of relays.
|
||||
@MainActor
|
||||
private func appendRelayToList(relay: Relay) {
|
||||
self.relays.append(relay)
|
||||
}
|
||||
|
||||
/// Ensures the given relay URLs are connected, adding them as ephemeral relays if not already in the pool.
|
||||
/// Returns the list of relay URLs that are actually connected (ready for subscriptions).
|
||||
///
|
||||
/// Callers should use `acquireEphemeralRelays` before the lookup and `releaseEphemeralRelays` after.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - relayURLs: The relay URLs to ensure are connected
|
||||
/// - timeout: Maximum time to wait for pending connections (default 2s). Returns early when first relay connects.
|
||||
/// Ensure the given relays are present in the pool and return those that are connected.
|
||||
///
|
||||
/// This will add missing URLs as ephemeral relays, initiate connections for relays that are not connected, and wait up to `timeout` for connections to establish. Once any relay connects, the method allows a short grace period for additional relays to connect before returning.
|
||||
/// - Parameters:
|
||||
/// - relayURLs: The relay URLs to ensure connectivity for. Missing URLs will be added as ephemeral relays.
|
||||
/// - timeout: Maximum time to wait for connections (default: 2 seconds). A short grace period (≈300 ms) is applied after the first relay connects.
|
||||
/// - Returns: The subset of `relayURLs` that are currently connected (includes relays that were already connected and those that became connected during the wait).
|
||||
func ensureConnected(to relayURLs: [RelayURL], timeout: Duration = .seconds(2)) async -> [RelayURL] {
|
||||
var toConnect: [RelayURL] = []
|
||||
var alreadyConnected: [RelayURL] = []
|
||||
|
||||
for url in relayURLs {
|
||||
if let existing = await get_relay(url) {
|
||||
if existing.connection.isConnected {
|
||||
alreadyConnected.append(url)
|
||||
#if DEBUG
|
||||
print("[RelayPool] Relay \(url.absoluteString) already connected")
|
||||
#endif
|
||||
} else {
|
||||
toConnect.append(url)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
let descriptor = RelayDescriptor(url: url, info: .readWrite, variant: .ephemeral)
|
||||
do {
|
||||
try await add_relay(descriptor)
|
||||
toConnect.append(url)
|
||||
#if DEBUG
|
||||
print("[RelayPool] Added ephemeral relay: \(url.absoluteString)")
|
||||
#endif
|
||||
} catch {
|
||||
#if DEBUG
|
||||
print("[RelayPool] Failed to add relay \(url.absoluteString): \(error)")
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
guard !toConnect.isEmpty else { return alreadyConnected }
|
||||
|
||||
await connect(to: toConnect)
|
||||
|
||||
let checkInterval: Duration = .milliseconds(50)
|
||||
let overallDeadline = ContinuousClock.now + timeout
|
||||
var graceDeadline: ContinuousClock.Instant? = alreadyConnected.isEmpty ? nil : ContinuousClock.now + .milliseconds(300)
|
||||
|
||||
// Wait for relays to connect. Once the first connects, start a grace period for others.
|
||||
waitLoop: while ContinuousClock.now < overallDeadline {
|
||||
do {
|
||||
try await Task.sleep(for: checkInterval)
|
||||
} catch {
|
||||
break
|
||||
}
|
||||
|
||||
// Check if any relay has connected
|
||||
var anyConnected = false
|
||||
for url in toConnect {
|
||||
if let relay = await get_relay(url), relay.connection.isConnected {
|
||||
anyConnected = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if anyConnected && graceDeadline == nil {
|
||||
// Start grace period on first connection
|
||||
graceDeadline = ContinuousClock.now + .milliseconds(300)
|
||||
}
|
||||
|
||||
// Exit once grace period expires (check every iteration if deadline is set)
|
||||
if let deadline = graceDeadline, ContinuousClock.now >= deadline {
|
||||
break waitLoop
|
||||
}
|
||||
}
|
||||
|
||||
// Collect all connected relays
|
||||
var connected = alreadyConnected
|
||||
for url in toConnect {
|
||||
if let relay = await get_relay(url), relay.connection.isConnected {
|
||||
connected.append(url)
|
||||
#if DEBUG
|
||||
print("[RelayPool] Relay \(url.absoluteString) connected: true")
|
||||
#endif
|
||||
} else {
|
||||
#if DEBUG
|
||||
print("[RelayPool] Relay \(url.absoluteString) connected: false (excluded)")
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
return connected
|
||||
}
|
||||
|
||||
/// Attaches a `RelayLog` to the connection for the specified relay and records the current network status in the log.
|
||||
/// - Parameters:
|
||||
/// - log: The `RelayLog` instance to attach to the relay's connection.
|
||||
/// - relay_id: The `RelayURL` identifying the relay whose connection will receive the log.
|
||||
func setLog(_ log: RelayLog, for relay_id: RelayURL) async {
|
||||
// add the current network state to the log
|
||||
log.add("Network state: \(network_monitor.currentPath.status)")
|
||||
@@ -252,9 +426,22 @@ class RelayPool {
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets relays matching the provided relay URLs, or all relays when no targets are specified.
|
||||
/// - Parameter targetRelays: Optional list of relay URLs to filter by. If `nil`, the pool's full relay list is returned.
|
||||
/// - Returns: An array of `Relay` instances corresponding to the requested URLs; any requested URL not present in the pool is omitted from the result.
|
||||
@MainActor
|
||||
func getRelays(targetRelays: [RelayURL]? = nil) -> [Relay] {
|
||||
targetRelays.map{ get_relays($0) } ?? self.relays
|
||||
let result = targetRelays.map{ get_relays($0) } ?? self.relays
|
||||
#if DEBUG
|
||||
if let targets = targetRelays {
|
||||
let found = result.map { $0.descriptor.url.absoluteString }
|
||||
let requested = targets.map { $0.absoluteString }
|
||||
if found.count != targets.count {
|
||||
print("[RelayPool] getRelays: MISMATCH! requested=\(requested) but found=\(found)")
|
||||
}
|
||||
}
|
||||
#endif
|
||||
return result
|
||||
}
|
||||
|
||||
/// Deletes queued up requests that should not persist between app sessions (i.e. when the app goes to background then back to foreground)
|
||||
@@ -305,10 +492,22 @@ class RelayPool {
|
||||
/// - filters: The filters specifying the desired content.
|
||||
/// - desiredRelays: The desired relays which to subsctibe to. If `nil`, it defaults to the `RelayPool`'s default list
|
||||
/// - eoseTimeout: The maximum timeout which to give up waiting for the eoseSignal
|
||||
/// - Returns: Returns an async stream that callers can easily consume via a for-loop
|
||||
/// Open a subscription for the given filters and provide a stream of matching items and EOSE notifications.
|
||||
/// - Parameters:
|
||||
/// - filters: The list of NostrFilter objects that define which events to receive.
|
||||
/// - desiredRelays: Optional list of RelayURL to subscribe to; when `nil` the pool's relays are used.
|
||||
/// - eoseTimeout: Optional timeout to wait before emitting an EOSE if not all relays have reported EOSE; defaults to 5 seconds.
|
||||
/// - id: Optional UUID to use as the subscription identifier; a new UUID is generated when `nil`.
|
||||
/// - Returns: An AsyncStream that yields StreamItem values representing matched events and end-of-stream (EOSE) notifications for this subscription. The stream deduplicates events by their NoteId. When the stream terminates it will unsubscribe from the chosen relays and remove the internal handler.
|
||||
func subscribe(filters: [NostrFilter], to desiredRelays: [RelayURL]? = nil, eoseTimeout: Duration? = nil, id: UUID? = nil) async -> AsyncStream<StreamItem> {
|
||||
let eoseTimeout = eoseTimeout ?? .seconds(5)
|
||||
let desiredRelays = await getRelays(targetRelays: desiredRelays)
|
||||
#if DEBUG
|
||||
print("[RelayPool] subscribe: requested=\(desiredRelays.map { $0.descriptor.url.absoluteString }), pool has \(await relays.count) relays")
|
||||
if let ids = filters.first?.ids {
|
||||
print("[RelayPool] subscribe: filter ids=\(ids.map { $0.hex() })")
|
||||
}
|
||||
#endif
|
||||
let startTime = CFAbsoluteTimeGetCurrent()
|
||||
return AsyncStream<StreamItem> { continuation in
|
||||
let id = id ?? UUID()
|
||||
@@ -439,6 +638,13 @@ class RelayPool {
|
||||
}
|
||||
}
|
||||
|
||||
/// Dispatches a Nostr request to the pool's matching relays, writing a local copy to the NostrDB and queuing the request for any relay that is not currently connected.
|
||||
///
|
||||
/// Filters target relays by their read/write capabilities and, optionally, by ephemeral status; connected relays receive the request immediately and disconnected relays have the request queued for later delivery. Sent messages are reported via `message_sent_function` when available.
|
||||
/// - Parameters:
|
||||
/// - req: The Nostr request to send.
|
||||
/// - to: Optional list of relay URLs to restrict delivery to; `nil` targets the pool's default set of relays.
|
||||
/// - skip_ephemeral: If `true`, skip ephemeral relays when sending the request.
|
||||
func send_raw(_ req: NostrRequestType, to: [RelayURL]? = nil, skip_ephemeral: Bool = true) async {
|
||||
let relays = await getRelays(targetRelays: to)
|
||||
|
||||
@@ -463,6 +669,11 @@ class RelayPool {
|
||||
}
|
||||
|
||||
relay.connection.send(req, callback: { str in
|
||||
#if DEBUG
|
||||
if relay.descriptor.ephemeral && str.hasPrefix("[\"REQ\"") {
|
||||
print("[RelayPool] Sending REQ to ephemeral relay \(relay.id.absoluteString): \(str)")
|
||||
}
|
||||
#endif
|
||||
self.message_sent_function?((str, relay))
|
||||
})
|
||||
}
|
||||
@@ -699,4 +910,3 @@ extension RelayPool {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -27,19 +27,23 @@ enum Marker: String {
|
||||
}
|
||||
}
|
||||
|
||||
/// A reference to a note event, with optional relay hint, marker, and author pubkey.
|
||||
/// Per NIP-10: `["e", <event-id>, <relay-url>, <marker>, <pubkey>]`
|
||||
struct NoteRef: IdType, TagConvertible, Equatable {
|
||||
let note_id: NoteId
|
||||
let relay: String?
|
||||
let marker: Marker?
|
||||
let pubkey: Pubkey?
|
||||
|
||||
var id: Data {
|
||||
self.note_id.id
|
||||
}
|
||||
|
||||
init(note_id: NoteId, relay: String? = nil, marker: Marker? = nil) {
|
||||
init(note_id: NoteId, relay: String? = nil, marker: Marker? = nil, pubkey: Pubkey? = nil) {
|
||||
self.note_id = note_id
|
||||
self.relay = relay
|
||||
self.marker = marker
|
||||
self.pubkey = pubkey
|
||||
}
|
||||
|
||||
static func note_id(_ note_id: NoteId) -> NoteRef {
|
||||
@@ -50,19 +54,26 @@ struct NoteRef: IdType, TagConvertible, Equatable {
|
||||
self.note_id = NoteId(data)
|
||||
self.relay = nil
|
||||
self.marker = nil
|
||||
self.pubkey = nil
|
||||
}
|
||||
|
||||
/// Generates a tag array per NIP-10: `["e", <event-id>, <relay-url>, <marker>, <pubkey>]`
|
||||
var tag: [String] {
|
||||
var t = ["e", self.hex()]
|
||||
if let marker {
|
||||
t.append(relay ?? "")
|
||||
t.append(marker.rawValue)
|
||||
if let pubkey {
|
||||
t.append(pubkey.hex())
|
||||
}
|
||||
} else if let relay {
|
||||
t.append(relay)
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
/// Parses a NoteRef from a tag per NIP-10: `["e", <event-id>, <relay-url>, <marker>, <pubkey>]`
|
||||
/// Only parses pubkey from position 4 when a valid marker is present in position 3.
|
||||
static func from_tag(tag: TagSequence) -> NoteRef? {
|
||||
guard tag.count >= 2 else { return nil }
|
||||
|
||||
@@ -78,14 +89,19 @@ struct NoteRef: IdType, TagConvertible, Equatable {
|
||||
|
||||
var relay: String? = nil
|
||||
var marker: Marker? = nil
|
||||
var pubkey: Pubkey? = nil
|
||||
|
||||
if tag.count >= 3, let r = i.next() {
|
||||
relay = r.string()
|
||||
if tag.count >= 4, let m = i.next() {
|
||||
marker = Marker(m)
|
||||
// Only parse pubkey when marker is recognized per NIP-10
|
||||
if marker != nil, tag.count >= 5, let pk = i.next(), let pubkeyData = pk.id() {
|
||||
pubkey = Pubkey(pubkeyData)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return NoteRef(note_id: note_id, relay: relay, marker: marker)
|
||||
return NoteRef(note_id: note_id, relay: relay, marker: marker, pubkey: pubkey)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user