Add note provenance filter support to SubscriptionManager
Closes: https://github.com/damus-io/damus/issues/3222 Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
@@ -137,7 +137,13 @@ extension NostrNetworkManager {
|
|||||||
case .event(let noteKey):
|
case .event(let noteKey):
|
||||||
let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey)
|
let lender = NdbNoteLender(ndb: self.ndb, noteKey: noteKey)
|
||||||
try Task.checkCancellation()
|
try Task.checkCancellation()
|
||||||
continuation.yield(.event(lender: lender))
|
guard let desiredRelays else {
|
||||||
|
continuation.yield(.event(lender: lender)) // If no desired relays are specified, return all notes we see.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if try ndb.was(noteKey: noteKey, seenOnAnyOf: desiredRelays) {
|
||||||
|
continuation.yield(.event(lender: lender)) // If desired relays were specified and this note was seen there, return it.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -141,7 +141,7 @@ class RelayPool {
|
|||||||
case .string(let str) = msg
|
case .string(let str) = msg
|
||||||
else { return }
|
else { return }
|
||||||
|
|
||||||
let _ = self.ndb.process_event(str)
|
let _ = self.ndb.process_event(str, originRelayURL: relay_id)
|
||||||
self.message_received_function?((str, desc))
|
self.message_received_function?((str, desc))
|
||||||
})
|
})
|
||||||
let relay = Relay(descriptor: desc, connection: conn)
|
let relay = Relay(descriptor: desc, connection: conn)
|
||||||
|
|||||||
@@ -51,14 +51,10 @@ class SearchHomeModel: ObservableObject {
|
|||||||
var follow_list_filter = NostrFilter(kinds: [.follow_list])
|
var follow_list_filter = NostrFilter(kinds: [.follow_list])
|
||||||
follow_list_filter.until = UInt32(Date.now.timeIntervalSince1970)
|
follow_list_filter.until = UInt32(Date.now.timeIntervalSince1970)
|
||||||
|
|
||||||
outerLoop: for await item in damus_state.nostrNetwork.reader.subscribe(filters: [get_base_filter(), follow_list_filter], to: to_relays) {
|
for await noteLender in damus_state.nostrNetwork.reader.streamNotesUntilEndOfStoredEvents(filters: [get_base_filter(), follow_list_filter], to: to_relays) {
|
||||||
switch item {
|
await noteLender.justUseACopy({ await self.handleEvent($0) })
|
||||||
case .event(let lender):
|
|
||||||
await lender.justUseACopy({ await self.handleEvent($0) })
|
|
||||||
case .eose:
|
|
||||||
break outerLoop
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DispatchQueue.main.async {
|
DispatchQueue.main.async {
|
||||||
self.loading = false
|
self.loading = false
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,4 +27,13 @@ extension Ndb {
|
|||||||
}
|
}
|
||||||
return try self.subscribe(filters: ndbFilters, maxSimultaneousResults: maxSimultaneousResults)
|
return try self.subscribe(filters: ndbFilters, maxSimultaneousResults: maxSimultaneousResults)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Determines if a given note was seen on any of the listed relay URLs
|
||||||
|
func was(noteKey: NoteKey, seenOnAnyOf relayUrls: [RelayURL], txn: SafeNdbTxn<()>? = nil) throws -> Bool {
|
||||||
|
return try self.was(noteKey: noteKey, seenOnAnyOf: relayUrls.map({ $0.absoluteString }), txn: txn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func process_event(_ str: String, originRelayURL: RelayURL? = nil) -> Bool {
|
||||||
|
self.process_event(str, originRelayURL: originRelayURL?.absoluteString)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -563,12 +563,22 @@ class Ndb {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func process_event(_ str: String) -> Bool {
|
func process_event(_ str: String, originRelayURL: String? = nil) -> Bool {
|
||||||
guard !is_closed else { return false }
|
guard !is_closed else { return false }
|
||||||
|
guard let originRelayURL else {
|
||||||
return str.withCString { cstr in
|
return str.withCString { cstr in
|
||||||
return ndb_process_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0
|
return ndb_process_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return str.withCString { cstr in
|
||||||
|
return originRelayURL.withCString { originRelayCString in
|
||||||
|
let meta = UnsafeMutablePointer<ndb_ingest_meta>.allocate(capacity: 1)
|
||||||
|
defer { meta.deallocate() }
|
||||||
|
ndb_ingest_meta_init(meta, 0, originRelayCString)
|
||||||
|
return ndb_process_event_with(ndb.ndb, cstr, Int32(str.utf8.count), meta) != 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func process_events(_ str: String) -> Bool {
|
func process_events(_ str: String) -> Bool {
|
||||||
guard !is_closed else { return false }
|
guard !is_closed else { return false }
|
||||||
@@ -805,6 +815,25 @@ class Ndb {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Determines if a given note was seen on a specific relay URL
|
||||||
|
func was(noteKey: NoteKey, seenOn relayUrl: String, txn: SafeNdbTxn<()>? = nil) throws -> Bool {
|
||||||
|
guard let txn = txn ?? SafeNdbTxn.new(on: self) else { throw NdbLookupError.cannotOpenTransaction }
|
||||||
|
return relayUrl.withCString({ relayCString in
|
||||||
|
return ndb_note_seen_on_relay(&txn.txn, noteKey, relayCString) == 1
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Determines if a given note was seen on any of the listed relay URLs
|
||||||
|
func was(noteKey: NoteKey, seenOnAnyOf relayUrls: [String], txn: SafeNdbTxn<()>? = nil) throws -> Bool {
|
||||||
|
guard let txn = txn ?? SafeNdbTxn.new(on: self) else { throw NdbLookupError.cannotOpenTransaction }
|
||||||
|
for relayUrl in relayUrls {
|
||||||
|
if try self.was(noteKey: noteKey, seenOn: relayUrl, txn: txn) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// MARK: Internal ndb callback interfaces
|
// MARK: Internal ndb callback interfaces
|
||||||
|
|
||||||
internal func setCallback(for subscriptionId: UInt64, callback: @escaping (NoteKey) -> Void) async {
|
internal func setCallback(for subscriptionId: UInt64, callback: @escaping (NoteKey) -> Void) async {
|
||||||
@@ -893,6 +922,11 @@ extension Ndb {
|
|||||||
case streamError(NdbStreamError)
|
case streamError(NdbStreamError)
|
||||||
case internalInconsistency
|
case internalInconsistency
|
||||||
case timeout
|
case timeout
|
||||||
|
case notFound
|
||||||
|
}
|
||||||
|
|
||||||
|
enum OperationError: Error {
|
||||||
|
case genericError
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -384,7 +384,12 @@ class NdbNote: Codable, Equatable, Hashable {
|
|||||||
// Extension to make NdbNote compatible with NostrEvent's original API
|
// Extension to make NdbNote compatible with NostrEvent's original API
|
||||||
extension NdbNote {
|
extension NdbNote {
|
||||||
var is_textlike: Bool {
|
var is_textlike: Bool {
|
||||||
return kind == 1 || kind == 42 || kind == 30023 || kind == 9802 || kind == 39089
|
switch known_kind {
|
||||||
|
case .text, .chat, .longform, .highlight:
|
||||||
|
true
|
||||||
|
default:
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var is_quote_repost: NoteId? {
|
var is_quote_repost: NoteId? {
|
||||||
|
|||||||
Reference in New Issue
Block a user