// // Ndb.swift // damus // // Created by William Casarin on 2023-08-25. // import Foundation import OSLog fileprivate let APPLICATION_GROUP_IDENTIFIER = "group.com.damus" enum NdbSearchOrder { case oldest_first case newest_first } enum DatabaseError: Error { case failed_open var errorDescription: String? { switch self { case .failed_open: return "Failed to open database" } } } class Ndb { var ndb: ndb_t let path: String? let owns_db: Bool var generation: Int private var closed: Bool private var callbackHandler: Ndb.CallbackHandler private static let DEFAULT_WRITER_SCRATCH_SIZE: Int32 = 2097152; // 2mb scratch size for the writer thread, it should match with the one specified in nostrdb.c var is_closed: Bool { self.closed || self.ndb.ndb == nil } static func safemode() -> Ndb? { guard let path = db_path ?? old_db_path else { return nil } // delete the database and start fresh if Self.db_files_exist(path: path) { let file_manager = FileManager.default for db_file in db_files { try? file_manager.removeItem(atPath: "\(path)/\(db_file)") } } guard let ndb = Ndb(path: path) else { return nil } return ndb } // NostrDB used to be stored on the app container's document directory static private var old_db_path: String? { guard let path = FileManager.default.urls(for: .documentDirectory, in: .userDomainMask).first?.absoluteString else { return nil } return remove_file_prefix(path) } static var db_path: String? { // Use the `group.com.damus` container, so that it can be accessible from other targets // e.g. The notification service extension needs to access Ndb data, which is done through this shared file container. guard let containerURL = FileManager.default.containerURL(forSecurityApplicationGroupIdentifier: APPLICATION_GROUP_IDENTIFIER) else { return nil } return remove_file_prefix(containerURL.absoluteString) } static private var db_files: [String] = ["data.mdb", "lock.mdb"] static var empty: Ndb { print("txn: NOSTRDB EMPTY") return Ndb(ndb: ndb_t(ndb: nil)) } static func open(path: String? = nil, owns_db_file: Bool = true, callbackHandler: Ndb.CallbackHandler) -> ndb_t? { var ndb_p: OpaquePointer? = nil let ingest_threads: Int32 = 4 var mapsize: Int = 1024 * 1024 * 1024 * 32 if path == nil && owns_db_file { // `nil` path indicates the default path will be used. // The default path changed over time, so migrate the database to the new location if needed do { try Self.migrate_db_location_if_needed() } catch { // If it fails to migrate, the app can still run without serious consequences. Log instead. Log.error("Error migrating NostrDB to new file container", for: .storage) } } guard let db_path = Self.db_path, owns_db_file || Self.db_files_exist(path: db_path) else { return nil // If the caller claims to not own the DB file, and the DB files do not exist, then we should not initialize Ndb } guard let path = path.map(remove_file_prefix) ?? Ndb.db_path else { return nil } let ok = path.withCString { testdir in var ok = false while !ok && mapsize > 1024 * 1024 * 700 { var cfg = ndb_config(flags: 0, ingester_threads: ingest_threads, writer_scratch_buffer_size: DEFAULT_WRITER_SCRATCH_SIZE, mapsize: mapsize, filter_context: nil, ingest_filter: nil, sub_cb_ctx: nil, sub_cb: nil) // Here we hook up the global callback function for subscription callbacks. // We do an "unretained" pass here because the lifetime of the callback handler is larger than the lifetime of the nostrdb monitor in the C code. // The NostrDB monitor that makes the callbacks should in theory _never_ outlive the callback handler. // // This means that: // - for as long as nostrdb is running, its parent Ndb instance will be alive, keeping the callback handler alive. // - when the Ndb instance is deinitialized — and the callback handler comes down with it — the `deinit` function will destroy the nostrdb monitor, preventing it from accessing freed memory. // // Therefore, we do not need to increase reference count to callbackHandler. The tightly coupled lifetimes will ensure that it is always alive when the ndb_monitor is alive. let ctx: UnsafeMutableRawPointer = Unmanaged.passUnretained(callbackHandler).toOpaque() ndb_config_set_subscription_callback(&cfg, subscription_callback, ctx) let res = ndb_init(&ndb_p, testdir, &cfg); ok = res != 0; if !ok { Log.error("ndb_init failed: %d, reducing mapsize from %d to %d", for: .storage, res, mapsize, mapsize / 2) mapsize /= 2 } } return ok } if !ok { return nil } let ndb_instance = ndb_t(ndb: ndb_p) Task { await callbackHandler.set(ndb: ndb_instance) } return ndb_instance } init?(path: String? = nil, owns_db_file: Bool = true) { let callbackHandler = Ndb.CallbackHandler() guard let db = Self.open(path: path, owns_db_file: owns_db_file, callbackHandler: callbackHandler) else { return nil } self.generation = 0 self.path = path self.owns_db = owns_db_file self.ndb = db self.closed = false self.callbackHandler = callbackHandler } private static func migrate_db_location_if_needed() throws { guard let old_db_path, let db_path else { throw Errors.cannot_find_db_path } let file_manager = FileManager.default let old_db_files_exist = Self.db_files_exist(path: old_db_path) let new_db_files_exist = Self.db_files_exist(path: db_path) // Migration rules: // 1. If DB files exist in the old path but not the new one, move files to the new path // 2. If files do not exist anywhere, do nothing (let new DB be initialized) // 3. If files exist in the new path, but not the old one, nothing needs to be done // 4. If files exist on both, do nothing. // Scenario 4 likely means that user has downgraded and re-upgraded. // Although it might make sense to get the most recent DB, it might lead to data loss. // If we leave both intact, it makes it easier to fix later, as no data loss would occur. if old_db_files_exist && !new_db_files_exist { Log.info("Migrating NostrDB to new file location…", for: .storage) do { try db_files.forEach { db_file in let old_path = "\(old_db_path)/\(db_file)" let new_path = "\(db_path)/\(db_file)" try file_manager.moveItem(atPath: old_path, toPath: new_path) } Log.info("NostrDB files successfully migrated to the new location", for: .storage) } catch { throw Errors.db_file_migration_error } } } private static func db_files_exist(path: String) -> Bool { return db_files.allSatisfy { FileManager.default.fileExists(atPath: "\(path)/\($0)") } } init(ndb: ndb_t) { self.ndb = ndb self.generation = 0 self.path = nil self.owns_db = true self.closed = false // This simple initialization will cause subscriptions not to be ever called. Probably fine because this initializer is used only for empty example ndb instances. self.callbackHandler = Ndb.CallbackHandler() } /// Mark NostrDB as "closed" without actually closing it. /// Useful when shutting down tasks that use NostrDB while avoiding new tasks from using it. func markClosed() { self.closed = true } func close() { guard !self.is_closed else { return } self.closed = true print("txn: CLOSING NOSTRDB") ndb_destroy(self.ndb.ndb) self.generation += 1 print("txn: NOSTRDB CLOSED") } func reopen() -> Bool { guard self.is_closed, let db = Self.open(path: self.path, owns_db_file: self.owns_db, callbackHandler: self.callbackHandler) else { return false } print("txn: NOSTRDB REOPENED (gen \(generation))") self.closed = false self.ndb = db return true } // GH_3245 TODO: This is a low level call, make it hidden from outside Ndb internal func lookup_blocks_by_key_with_txn(_ key: NoteKey, txn: RawNdbTxnAccessible) -> NdbBlockGroup.BlocksMetadata? { guard let blocks = ndb_get_blocks_by_key(self.ndb.ndb, &txn.txn, key) else { return nil } return NdbBlockGroup.BlocksMetadata(ptr: blocks) } func lookup_blocks_by_key(_ key: NoteKey, borrow lendingFunction: (_: borrowing NdbBlockGroup.BlocksMetadata?) throws -> T) rethrows -> T { let txn = SafeNdbTxn.new(on: self) { txn in lookup_blocks_by_key_with_txn(key, txn: txn) } guard let txn else { return try lendingFunction(nil) } return try lendingFunction(txn.val) } private func lookup_note_by_key_with_txn(_ key: NoteKey, txn: NdbTxn) -> NdbNote? { var size: Int = 0 guard let note_p = ndb_get_note_by_key(&txn.txn, key, &size) else { return nil } let ptr = ndb_note_ptr(ptr: note_p) return NdbNote(note: ptr, size: size, owned: false, key: key) } func text_search(query: String, limit: Int = 128, order: NdbSearchOrder = .newest_first) -> [NoteKey] { guard let txn = NdbTxn(ndb: self) else { return [] } var results = ndb_text_search_results() let res = query.withCString { q in let order = order == .newest_first ? NDB_ORDER_DESCENDING : NDB_ORDER_ASCENDING var config = ndb_text_search_config(order: order, limit: Int32(limit)) return ndb_text_search(&txn.txn, q, &results, &config) } if res == 0 { return [] } var note_ids = [NoteKey]() for i in 0..(_ key: NoteKey, borrow lendingFunction: (_: borrowing UnownedNdbNote?) throws -> T) rethrows -> T { let txn = NdbTxn(ndb: self) { txn in lookup_note_by_key_with_txn(key, txn: txn) } guard let rawNote = txn?.unsafeUnownedValue else { return try lendingFunction(nil) } let unownedNote = UnownedNdbNote(rawNote) return try lendingFunction(.some(unownedNote)) } func lookup_note_by_key_and_copy(_ key: NoteKey) -> NdbNote? { return lookup_note_by_key(key, borrow: { maybeUnownedNote -> NdbNote? in switch maybeUnownedNote { case .none: return nil case .some(let unownedNote): return unownedNote.toOwned() } }) } private func lookup_profile_by_key_inner(_ key: ProfileKey, txn: RawNdbTxnAccessible) -> ProfileRecord? { var size: Int = 0 guard let profile_p = ndb_get_profile_by_key(&txn.txn, key, &size) else { return nil } return profile_flatbuf_to_record(ptr: profile_p, size: size, key: key) } private func profile_flatbuf_to_record(ptr: UnsafeMutableRawPointer, size: Int, key: UInt64) -> ProfileRecord? { do { var buf = ByteBuffer(assumingMemoryBound: ptr, capacity: size) let rec: NdbProfileRecord = try getDebugCheckedRoot(byteBuffer: &buf) return ProfileRecord(data: rec, key: key) } catch { // Handle error appropriately print("UNUSUAL: \(error)") return nil } } private func lookup_note_with_txn_inner(id: NoteId, txn: NdbTxn) -> NdbNote? { return id.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NdbNote? in var key: UInt64 = 0 var size: Int = 0 guard let baseAddress = ptr.baseAddress, let note_p = ndb_get_note_by_id(&txn.txn, baseAddress, &size, &key) else { return nil } let ptr = ndb_note_ptr(ptr: note_p) return NdbNote(note: ptr, size: size, owned: false, key: key) } } private func lookup_profile_with_txn_inner(pubkey: Pubkey, txn: some RawNdbTxnAccessible) -> ProfileRecord? { var record: ProfileRecord? = nil pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) in var size: Int = 0 var key: UInt64 = 0 guard let baseAddress = ptr.baseAddress, let profile_p = ndb_get_profile_by_pubkey(&txn.txn, baseAddress, &size, &key) else { return } record = profile_flatbuf_to_record(ptr: profile_p, size: size, key: key) } return record } private func lookup_profile_by_key_with_txn(key: ProfileKey, txn: RawNdbTxnAccessible) -> ProfileRecord? { lookup_profile_by_key_inner(key, txn: txn) } func lookup_profile_by_key(key: ProfileKey, borrow lendingFunction: (_: borrowing ProfileRecord?) throws -> T) rethrows -> T { let txn = SafeNdbTxn.new(on: self) { txn in return lookup_profile_by_key_inner(key, txn: txn) } guard let txn else { return try lendingFunction(nil) } return try lendingFunction(txn.val) } private func lookup_note_with_txn(id: NoteId, txn: NdbTxn) -> NdbNote? { lookup_note_with_txn_inner(id: id, txn: txn) } func lookup_profile_key(_ pubkey: Pubkey) -> ProfileKey? { guard let txn = NdbTxn(ndb: self, with: { txn in lookup_profile_key_with_txn(pubkey, txn: txn) }) else { return nil } return txn.value } private func lookup_profile_key_with_txn(_ pubkey: Pubkey, txn: NdbTxn) -> ProfileKey? { return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NoteKey? in guard let p = ptr.baseAddress else { return nil } let r = ndb_get_profilekey_by_pubkey(&txn.txn, p) if r == 0 { return nil } return r } } // GH_3245 TODO: This is a low level call, make it hidden from outside Ndb internal func lookup_note_key_with_txn(_ id: NoteId, txn: some RawNdbTxnAccessible) -> NoteKey? { guard !closed else { return nil } return id.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NoteKey? in guard let p = ptr.baseAddress else { return nil } let r = ndb_get_notekey_by_id(&txn.txn, p) if r == 0 { return nil } return r } } func lookup_note_key(_ id: NoteId) -> NoteKey? { guard let txn = NdbTxn(ndb: self, with: { txn in lookup_note_key_with_txn(id, txn: txn) }) else { return nil } return txn.value } func lookup_note(_ id: NoteId, borrow lendingFunction: (_: borrowing UnownedNdbNote?) throws -> T) rethrows -> T { let txn = NdbTxn(ndb: self) { txn in lookup_note_with_txn_inner(id: id, txn: txn) } guard let rawNote = txn?.unsafeUnownedValue else { return try lendingFunction(nil) } return try lendingFunction(UnownedNdbNote(rawNote)) } func lookup_note_and_copy(_ id: NoteId) -> NdbNote? { return self.lookup_note(id, borrow: { unownedNote in return unownedNote?.toOwned() }) } func lookup_profile(_ pubkey: Pubkey, borrow lendingFunction: (_: borrowing ProfileRecord?) throws -> T) rethrows -> T { let txn = SafeNdbTxn.new(on: self) { txn in lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn) } guard let txn else { return try lendingFunction(nil) } return try lendingFunction(txn.val) } func lookup_profile_lnurl(_ pubkey: Pubkey) -> String? { return lookup_profile(pubkey, borrow: { pr in switch pr { case .none: return nil case .some(let pr): return pr.lnurl } }) } func lookup_profile_and_copy(_ pubkey: Pubkey) -> Profile? { return self.lookup_profile(pubkey, borrow: { pr in switch pr { case .some(let pr): return pr.profile case .none: return nil } }) } private func lookup_profile_with_txn(_ pubkey: Pubkey, txn: NdbTxn) -> ProfileRecord? { lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn) } func process_client_event(_ str: String) -> Bool { guard !self.is_closed else { return false } return str.withCString { cstr in return ndb_process_client_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0 } } func write_profile_last_fetched(pubkey: Pubkey, fetched_at: UInt64) { guard !closed else { return } let _ = pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> () in guard let p = ptr.baseAddress else { return } ndb_write_last_profile_fetch(ndb.ndb, p, fetched_at) } } private func read_profile_last_fetched(txn: NdbTxn, pubkey: Pubkey) -> UInt64? { guard !closed else { return nil } return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> UInt64? in guard let p = ptr.baseAddress else { return nil } let res = ndb_read_last_profile_fetch(&txn.txn, p) if res == 0 { return nil } return res } } func read_profile_last_fetched(pubkey: Pubkey) -> UInt64? { var last_fetched: UInt64? = nil let _ = NdbTxn(ndb: self) { txn in last_fetched = read_profile_last_fetched(txn: txn, pubkey: pubkey) } return last_fetched } func process_event(_ str: String, originRelayURL: String? = nil) -> Bool { guard !is_closed else { return false } guard let originRelayURL else { return str.withCString { cstr in return ndb_process_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0 } } return str.withCString { cstr in return originRelayURL.withCString { originRelayCString in let meta = UnsafeMutablePointer.allocate(capacity: 1) defer { meta.deallocate() } ndb_ingest_meta_init(meta, 0, originRelayCString) return ndb_process_event_with(ndb.ndb, cstr, Int32(str.utf8.count), meta) != 0 } } } func process_events(_ str: String) -> Bool { guard !is_closed else { return false } return str.withCString { cstr in return ndb_process_events(ndb.ndb, cstr, str.utf8.count) != 0 } } func search_profile(_ search: String, limit: Int) -> [Pubkey] { guard let txn = NdbTxn<()>.init(ndb: self) else { return [] } return search_profile(search, limit: limit, txn: txn) } private func search_profile(_ search: String, limit: Int, txn: NdbTxn) -> [Pubkey] { var pks = Array() return search.withCString { q in var s = ndb_search() guard ndb_search_profile(&txn.txn, &s, q) != 0 else { return pks } defer { ndb_search_profile_end(&s) } pks.append(Pubkey(Data(bytes: &s.key.pointee.id.0, count: 32))) var n = limit while n > 0 { guard ndb_search_profile_next(&s) != 0 else { return pks } pks.append(Pubkey(Data(bytes: &s.key.pointee.id.0, count: 32))) n -= 1 } return pks } } // MARK: NdbFilter queries and subscriptions func query(filters: [NdbFilter], maxResults: Int) throws(NdbStreamError) -> [NoteKey] { guard let txn = NdbTxn(ndb: self) else { return [] } return try query(with: txn, filters: filters, maxResults: maxResults) } /// Safe wrapper around the `ndb_query` C function /// - Parameters: /// - txn: Database transaction /// - filters: Array of NdbFilter objects /// - maxResults: Maximum number of results to return /// - Returns: Array of note keys matching the filters /// - Throws: NdbStreamError if the query fails private func query(with txn: NdbTxn, filters: [NdbFilter], maxResults: Int) throws(NdbStreamError) -> [NoteKey] { guard !self.is_closed else { throw .ndbClosed } let filtersPointer = UnsafeMutablePointer.allocate(capacity: filters.count) defer { filtersPointer.deallocate() } for (index, ndbFilter) in filters.enumerated() { filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter } let count = UnsafeMutablePointer.allocate(capacity: 1) defer { count.deallocate() } let results = UnsafeMutablePointer.allocate(capacity: maxResults) defer { results.deallocate() } guard !self.is_closed else { throw .ndbClosed } guard ndb_query(&txn.txn, filtersPointer, Int32(filters.count), results, Int32(maxResults), count) == 1 else { throw NdbStreamError.initialQueryFailed } var noteIds: [NoteKey] = [] for i in 0.. AsyncStream { return AsyncStream { continuation in // Allocate filters pointer - will be deallocated when subscription ends // Cannot use `defer` to deallocate `filtersPointer` because it needs to remain valid for the lifetime of the subscription, which extends beyond this block's scope. let filtersPointer = UnsafeMutablePointer.allocate(capacity: filters.count) for (index, ndbFilter) in filters.enumerated() { filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter } var streaming = true var subid: UInt64 = 0 var terminationStarted = false // Set up termination handler continuation.onTermination = { @Sendable _ in guard !terminationStarted else { return } // Avoid race conditions between two termination closures terminationStarted = true Log.debug("ndb_wait: stream: Terminated early", for: .ndb) streaming = false // Clean up resources on early termination if subid != 0 { ndb_unsubscribe(self.ndb.ndb, subid) Task { await self.unsetContinuation(subscriptionId: subid) } } filtersPointer.deallocate() } if !streaming { return } // Set up subscription subid = ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count)) // We are setting the continuation after issuing the subscription call. // This won't cause lost notes because if any notes get issued before registering // the continuation they will get queued by `Ndb.CallbackHandler` let continuationSetupTask = Task { await self.setContinuation(for: subid, continuation: continuation) } // Update termination handler to include subscription cleanup continuation.onTermination = { @Sendable _ in guard !terminationStarted else { return } // Avoid race conditions between two termination closures terminationStarted = true Log.debug("ndb_wait: stream: Terminated early", for: .ndb) streaming = false continuationSetupTask.cancel() Task { await self.unsetContinuation(subscriptionId: subid) } filtersPointer.deallocate() guard !self.is_closed else { return } // Double-check Ndb is open before sending unsubscribe ndb_unsubscribe(self.ndb.ndb, subid) } } } func subscribe(filters: [NdbFilter], maxSimultaneousResults: Int = 1000) throws -> AsyncStream { guard !self.is_closed else { throw NdbStreamError.ndbClosed } do { try Task.checkCancellation() } catch { throw NdbStreamError.cancelled } // CRITICAL: Create the subscription FIRST before querying to avoid race condition // This ensures that any events indexed after subscription but before query won't be missed let newEventsStream = ndbSubscribe(filters: filters) // Now fetch initial results after subscription is registered guard let txn = NdbTxn(ndb: self) else { throw NdbStreamError.cannotOpenTransaction } // Use our safe wrapper instead of direct C function call let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults) do { try Task.checkCancellation() } catch { throw NdbStreamError.cancelled } // Create a cascading stream that combines initial results with new events return AsyncStream { continuation in // Stream all results already present in the database for noteId in noteIds { if Task.isCancelled { return } continuation.yield(.event(noteId)) } // Indicate this is the end of the results currently present in the database continuation.yield(.eose) // Create a task to forward events from the subscription stream let forwardingTask = Task { for await noteKey in newEventsStream { if Task.isCancelled { break } continuation.yield(.event(noteKey)) } continuation.finish() } // Handle termination by canceling the forwarding task continuation.onTermination = { @Sendable _ in forwardingTask.cancel() } } } private func waitWithoutTimeout(for noteId: NoteId) async throws(NdbLookupError) -> NdbTxn? { do { for try await item in try self.subscribe(filters: [NostrFilter(ids: [noteId])]) { switch item { case .eose: continue case .event(let noteKey): guard let txn = NdbTxn(ndb: self) else { throw NdbLookupError.cannotOpenTransaction } guard let note = self.lookup_note_by_key_with_txn(noteKey, txn: txn) else { throw NdbLookupError.internalInconsistency } if note.id == noteId { Log.debug("ndb wait: %d has matching id %s. Returning transaction", for: .ndb, noteKey, noteId.hex()) return NdbTxn.pure(ndb: self, val: note) } } } } catch { if let error = error as? NdbStreamError { throw NdbLookupError.streamError(error) } else if let error = error as? NdbLookupError { throw error } else { throw .internalInconsistency } } return nil } /// Determines if a given note was seen on a specific relay URL private func was(noteKey: NoteKey, seenOn relayUrl: String, txn: SafeNdbTxn<()>?) throws -> Bool { guard let txn = txn ?? SafeNdbTxn.new(on: self) else { throw NdbLookupError.cannotOpenTransaction } return relayUrl.withCString({ relayCString in return ndb_note_seen_on_relay(&txn.txn, noteKey, relayCString) == 1 }) } func was(noteKey: NoteKey, seenOn relayUrl: String) throws -> Bool { return try self.was(noteKey: noteKey, seenOn: relayUrl, txn: nil) } /// Determines if a given note was seen on any of the listed relay URLs private func was(noteKey: NoteKey, seenOnAnyOf relayUrls: [String], txn: SafeNdbTxn<()>? = nil) throws -> Bool { guard let txn = txn ?? SafeNdbTxn.new(on: self) else { throw NdbLookupError.cannotOpenTransaction } for relayUrl in relayUrls { if try self.was(noteKey: noteKey, seenOn: relayUrl, txn: txn) { return true } } return false } /// Determines if a given note was seen on any of the listed relay URLs func was(noteKey: NoteKey, seenOnAnyOf relayUrls: [String]) throws -> Bool { return try self.was(noteKey: noteKey, seenOnAnyOf: relayUrls, txn: nil) } // MARK: Internal ndb callback interfaces internal func setContinuation(for subscriptionId: UInt64, continuation: AsyncStream.Continuation) async { await self.callbackHandler.set(continuation: continuation, for: subscriptionId) } internal func unsetContinuation(subscriptionId: UInt64) async { await self.callbackHandler.unset(subid: subscriptionId) } // MARK: Helpers enum Errors: Error { case cannot_find_db_path case db_file_migration_error } // MARK: Deinitialization deinit { print("txn: Ndb de-init") self.close() } } // MARK: - Extensions and helper structures and functions extension Ndb { /// A class that is used to handle callbacks from nostrdb /// /// This is a separate class from `Ndb` because it simplifies the initialization logic actor CallbackHandler { /// Holds the ndb instance in the C codebase. Should be shared with `Ndb` var ndb: ndb_t? = nil /// A map from nostrdb subscription ids to stream continuations, which allows publishing note keys to active listeners var subscriptionContinuationMap: [UInt64: AsyncStream.Continuation] = [:] /// A map from nostrdb subscription ids to queued note keys (for when there are no active listeners) var subscriptionQueueMap: [UInt64: [NoteKey]] = [:] /// Maximum number of items to queue per subscription when no one is listening let maxQueueItemsPerSubscription: Int = 2000 func set(continuation: AsyncStream.Continuation, for subid: UInt64) { // Flush any queued items to the new continuation if let queuedItems = subscriptionQueueMap[subid] { for noteKey in queuedItems { continuation.yield(noteKey) } subscriptionQueueMap[subid] = nil } subscriptionContinuationMap[subid] = continuation } func unset(subid: UInt64) { subscriptionContinuationMap[subid] = nil subscriptionQueueMap[subid] = nil } func set(ndb: ndb_t?) { self.ndb = ndb } /// Handles callbacks from nostrdb subscriptions, and routes them to the correct continuation or queue func handleSubscriptionCallback(subId: UInt64, maxCapacity: Int32 = 1000) { let result = UnsafeMutablePointer.allocate(capacity: Int(maxCapacity)) defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks guard let ndb else { return } let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity) for i in 0..= maxQueueItemsPerSubscription { queue.removeFirst() } queue.append(noteKey) subscriptionQueueMap[subId] = queue } } } } /// An item that comes out of a subscription stream enum StreamItem { /// End of currently stored events case eose /// An event in NostrDB available at the given note key case event(NoteKey) } /// An error that may happen during nostrdb streaming enum NdbStreamError: Error { case cannotOpenTransaction case cannotConvertFilter(any Error) case initialQueryFailed case timeout case cancelled case ndbClosed } /// An error that may happen when looking something up enum NdbLookupError: Error { case cannotOpenTransaction case streamError(NdbStreamError) case internalInconsistency case timeout case notFound } enum OperationError: Error { case genericError } } /// This callback "trampoline" function will be called when new notes arrive for NostrDB subscriptions. /// /// This is needed as a separate global function in order to allow us to pass it to the C code as a callback (We can't pass native Swift fuctions directly as callbacks). /// /// - Parameters: /// - ctx: A pointer to a context object setup during initialization. This allows this function to "find" the correct place to call. MUST be a pointer to a `CallbackHandler`, otherwise this will trigger a crash /// - subid: The NostrDB subscription ID, which identifies the subscription that is being called back @_cdecl("subscription_callback") public func subscription_callback(ctx: UnsafeMutableRawPointer?, subid: UInt64) { guard let ctx else { return } let handler = Unmanaged.fromOpaque(ctx).takeUnretainedValue() Task { await handler.handleSubscriptionCallback(subId: subid) } } #if DEBUG func getDebugCheckedRoot(byteBuffer: inout ByteBuffer) throws -> T { return getRoot(byteBuffer: &byteBuffer) } #else func getDebugCheckedRoot(byteBuffer: inout ByteBuffer) throws -> T { return getRoot(byteBuffer: &byteBuffer) } #endif func remove_file_prefix(_ str: String) -> String { return str.replacingOccurrences(of: "file://", with: "") }