// // Ndb.swift // damus // // Created by William Casarin on 2023-08-25. // import Foundation import OSLog fileprivate let APPLICATION_GROUP_IDENTIFIER = "group.com.damus" enum NdbSearchOrder { case oldest_first case newest_first } enum DatabaseError: Error { case failed_open var errorDescription: String? { switch self { case .failed_open: return "Failed to open database" } } } class Ndb { var ndb: ndb_t let path: String? let owns_db: Bool var generation: Int private var closed: Bool private var callbackHandler: Ndb.CallbackHandler private static let DEFAULT_WRITER_SCRATCH_SIZE: Int32 = 2097152; // 2mb scratch size for the writer thread, it should match with the one specified in nostrdb.c var is_closed: Bool { self.closed || self.ndb.ndb == nil } static func safemode() -> Ndb? { guard let path = db_path ?? old_db_path else { return nil } // delete the database and start fresh if Self.db_files_exist(path: path) { let file_manager = FileManager.default for db_file in db_files { try? file_manager.removeItem(atPath: "\(path)/\(db_file)") } } guard let ndb = Ndb(path: path) else { return nil } return ndb } // NostrDB used to be stored on the app container's document directory static private var old_db_path: String? { guard let path = FileManager.default.urls(for: .documentDirectory, in: .userDomainMask).first?.absoluteString else { return nil } return remove_file_prefix(path) } static var db_path: String? { // Use the `group.com.damus` container, so that it can be accessible from other targets // e.g. The notification service extension needs to access Ndb data, which is done through this shared file container. guard let containerURL = FileManager.default.containerURL(forSecurityApplicationGroupIdentifier: APPLICATION_GROUP_IDENTIFIER) else { return nil } return remove_file_prefix(containerURL.absoluteString) } static private var db_files: [String] = ["data.mdb", "lock.mdb"] static var empty: Ndb { print("txn: NOSTRDB EMPTY") return Ndb(ndb: ndb_t(ndb: nil)) } static func open(path: String? = nil, owns_db_file: Bool = true, callbackHandler: Ndb.CallbackHandler) -> ndb_t? { var ndb_p: OpaquePointer? = nil let ingest_threads: Int32 = 4 var mapsize: Int = 1024 * 1024 * 1024 * 32 if path == nil && owns_db_file { // `nil` path indicates the default path will be used. // The default path changed over time, so migrate the database to the new location if needed do { try Self.migrate_db_location_if_needed() } catch { // If it fails to migrate, the app can still run without serious consequences. Log instead. Log.error("Error migrating NostrDB to new file container", for: .storage) } } guard let db_path = Self.db_path, owns_db_file || Self.db_files_exist(path: db_path) else { return nil // If the caller claims to not own the DB file, and the DB files do not exist, then we should not initialize Ndb } guard let path = path.map(remove_file_prefix) ?? Ndb.db_path else { return nil } let ok = path.withCString { testdir in var ok = false while !ok && mapsize > 1024 * 1024 * 700 { var cfg = ndb_config(flags: 0, ingester_threads: ingest_threads, writer_scratch_buffer_size: DEFAULT_WRITER_SCRATCH_SIZE, mapsize: mapsize, filter_context: nil, ingest_filter: nil, sub_cb_ctx: nil, sub_cb: nil) // Here we hook up the global callback function for subscription callbacks. // We do an "unretained" pass here because the lifetime of the callback handler is larger than the lifetime of the nostrdb monitor in the C code. // The NostrDB monitor that makes the callbacks should in theory _never_ outlive the callback handler. // // This means that: // - for as long as nostrdb is running, its parent Ndb instance will be alive, keeping the callback handler alive. // - when the Ndb instance is deinitialized — and the callback handler comes down with it — the `deinit` function will destroy the nostrdb monitor, preventing it from accessing freed memory. // // Therefore, we do not need to increase reference count to callbackHandler. The tightly coupled lifetimes will ensure that it is always alive when the ndb_monitor is alive. let ctx: UnsafeMutableRawPointer = Unmanaged.passUnretained(callbackHandler).toOpaque() ndb_config_set_subscription_callback(&cfg, subscription_callback, ctx) let res = ndb_init(&ndb_p, testdir, &cfg); ok = res != 0; if !ok { Log.error("ndb_init failed: %d, reducing mapsize from %d to %d", for: .storage, res, mapsize, mapsize / 2) mapsize /= 2 } } return ok } if !ok { return nil } let ndb_instance = ndb_t(ndb: ndb_p) Task { await callbackHandler.set(ndb: ndb_instance) } return ndb_instance } init?(path: String? = nil, owns_db_file: Bool = true) { let callbackHandler = Ndb.CallbackHandler() guard let db = Self.open(path: path, owns_db_file: owns_db_file, callbackHandler: callbackHandler) else { return nil } self.generation = 0 self.path = path self.owns_db = owns_db_file self.ndb = db self.closed = false self.callbackHandler = callbackHandler } private static func migrate_db_location_if_needed() throws { guard let old_db_path, let db_path else { throw Errors.cannot_find_db_path } let file_manager = FileManager.default let old_db_files_exist = Self.db_files_exist(path: old_db_path) let new_db_files_exist = Self.db_files_exist(path: db_path) // Migration rules: // 1. If DB files exist in the old path but not the new one, move files to the new path // 2. If files do not exist anywhere, do nothing (let new DB be initialized) // 3. If files exist in the new path, but not the old one, nothing needs to be done // 4. If files exist on both, do nothing. // Scenario 4 likely means that user has downgraded and re-upgraded. // Although it might make sense to get the most recent DB, it might lead to data loss. // If we leave both intact, it makes it easier to fix later, as no data loss would occur. if old_db_files_exist && !new_db_files_exist { Log.info("Migrating NostrDB to new file location…", for: .storage) do { try db_files.forEach { db_file in let old_path = "\(old_db_path)/\(db_file)" let new_path = "\(db_path)/\(db_file)" try file_manager.moveItem(atPath: old_path, toPath: new_path) } Log.info("NostrDB files successfully migrated to the new location", for: .storage) } catch { throw Errors.db_file_migration_error } } } private static func db_files_exist(path: String) -> Bool { return db_files.allSatisfy { FileManager.default.fileExists(atPath: "\(path)/\($0)") } } init(ndb: ndb_t) { self.ndb = ndb self.generation = 0 self.path = nil self.owns_db = true self.closed = false // This simple initialization will cause subscriptions not to be ever called. Probably fine because this initializer is used only for empty example ndb instances. self.callbackHandler = Ndb.CallbackHandler() } func close() { guard !self.is_closed else { return } self.closed = true print("txn: CLOSING NOSTRDB") ndb_destroy(self.ndb.ndb) self.generation += 1 print("txn: NOSTRDB CLOSED") } func reopen() -> Bool { guard self.is_closed, let db = Self.open(path: self.path, owns_db_file: self.owns_db, callbackHandler: self.callbackHandler) else { return false } print("txn: NOSTRDB REOPENED (gen \(generation))") self.closed = false self.ndb = db return true } func lookup_blocks_by_key_with_txn(_ key: NoteKey, txn: RawNdbTxnAccessible) -> NdbBlockGroup.BlocksMetadata? { guard let blocks = ndb_get_blocks_by_key(self.ndb.ndb, &txn.txn, key) else { return nil } return NdbBlockGroup.BlocksMetadata(ptr: blocks) } func lookup_blocks_by_key(_ key: NoteKey) -> SafeNdbTxn? { SafeNdbTxn.new(on: self) { txn in lookup_blocks_by_key_with_txn(key, txn: txn) } } func lookup_note_by_key_with_txn(_ key: NoteKey, txn: NdbTxn) -> NdbNote? { var size: Int = 0 guard let note_p = ndb_get_note_by_key(&txn.txn, key, &size) else { return nil } let ptr = ndb_note_ptr(ptr: note_p) return NdbNote(note: ptr, size: size, owned: false, key: key) } func text_search(query: String, limit: Int = 128, order: NdbSearchOrder = .newest_first) -> [NoteKey] { guard let txn = NdbTxn(ndb: self) else { return [] } var results = ndb_text_search_results() let res = query.withCString { q in let order = order == .newest_first ? NDB_ORDER_DESCENDING : NDB_ORDER_ASCENDING var config = ndb_text_search_config(order: order, limit: Int32(limit)) return ndb_text_search(&txn.txn, q, &results, &config) } if res == 0 { return [] } var note_ids = [NoteKey]() for i in 0.. NdbTxn? { return NdbTxn(ndb: self) { txn in lookup_note_by_key_with_txn(key, txn: txn) } } private func lookup_profile_by_key_inner(_ key: ProfileKey, txn: NdbTxn) -> ProfileRecord? { var size: Int = 0 guard let profile_p = ndb_get_profile_by_key(&txn.txn, key, &size) else { return nil } return profile_flatbuf_to_record(ptr: profile_p, size: size, key: key) } private func profile_flatbuf_to_record(ptr: UnsafeMutableRawPointer, size: Int, key: UInt64) -> ProfileRecord? { do { var buf = ByteBuffer(assumingMemoryBound: ptr, capacity: size) let rec: NdbProfileRecord = try getDebugCheckedRoot(byteBuffer: &buf) return ProfileRecord(data: rec, key: key) } catch { // Handle error appropriately print("UNUSUAL: \(error)") return nil } } private func lookup_note_with_txn_inner(id: NoteId, txn: NdbTxn) -> NdbNote? { return id.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NdbNote? in var key: UInt64 = 0 var size: Int = 0 guard let baseAddress = ptr.baseAddress, let note_p = ndb_get_note_by_id(&txn.txn, baseAddress, &size, &key) else { return nil } let ptr = ndb_note_ptr(ptr: note_p) return NdbNote(note: ptr, size: size, owned: false, key: key) } } private func lookup_profile_with_txn_inner(pubkey: Pubkey, txn: NdbTxn) -> ProfileRecord? { return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> ProfileRecord? in var size: Int = 0 var key: UInt64 = 0 guard let baseAddress = ptr.baseAddress, let profile_p = ndb_get_profile_by_pubkey(&txn.txn, baseAddress, &size, &key) else { return nil } return profile_flatbuf_to_record(ptr: profile_p, size: size, key: key) } } func lookup_profile_by_key_with_txn(key: ProfileKey, txn: NdbTxn) -> ProfileRecord? { lookup_profile_by_key_inner(key, txn: txn) } func lookup_profile_by_key(key: ProfileKey) -> NdbTxn? { return NdbTxn(ndb: self) { txn in lookup_profile_by_key_inner(key, txn: txn) } } func lookup_note_with_txn(id: NoteId, txn: NdbTxn) -> NdbNote? { lookup_note_with_txn_inner(id: id, txn: txn) } func lookup_profile_key(_ pubkey: Pubkey) -> ProfileKey? { guard let txn = NdbTxn(ndb: self, with: { txn in lookup_profile_key_with_txn(pubkey, txn: txn) }) else { return nil } return txn.value } func lookup_profile_key_with_txn(_ pubkey: Pubkey, txn: NdbTxn) -> ProfileKey? { return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NoteKey? in guard let p = ptr.baseAddress else { return nil } let r = ndb_get_profilekey_by_pubkey(&txn.txn, p) if r == 0 { return nil } return r } } func lookup_note_key_with_txn(_ id: NoteId, txn: some RawNdbTxnAccessible) -> NoteKey? { guard !closed else { return nil } return id.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NoteKey? in guard let p = ptr.baseAddress else { return nil } let r = ndb_get_notekey_by_id(&txn.txn, p) if r == 0 { return nil } return r } } func lookup_note_key(_ id: NoteId) -> NoteKey? { guard let txn = NdbTxn(ndb: self, with: { txn in lookup_note_key_with_txn(id, txn: txn) }) else { return nil } return txn.value } func lookup_note(_ id: NoteId, txn_name: String? = nil) -> NdbTxn? { NdbTxn(ndb: self, name: txn_name) { txn in lookup_note_with_txn_inner(id: id, txn: txn) } } func lookup_profile(_ pubkey: Pubkey, txn_name: String? = nil) -> NdbTxn? { NdbTxn(ndb: self, name: txn_name) { txn in lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn) } } func lookup_profile_with_txn(_ pubkey: Pubkey, txn: NdbTxn) -> ProfileRecord? { lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn) } func process_client_event(_ str: String) -> Bool { guard !self.is_closed else { return false } return str.withCString { cstr in return ndb_process_client_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0 } } func write_profile_last_fetched(pubkey: Pubkey, fetched_at: UInt64) { guard !closed else { return } let _ = pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> () in guard let p = ptr.baseAddress else { return } ndb_write_last_profile_fetch(ndb.ndb, p, fetched_at) } } func read_profile_last_fetched(txn: NdbTxn, pubkey: Pubkey) -> UInt64? { guard !closed else { return nil } return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> UInt64? in guard let p = ptr.baseAddress else { return nil } let res = ndb_read_last_profile_fetch(&txn.txn, p) if res == 0 { return nil } return res } } func process_event(_ str: String, originRelayURL: String? = nil) -> Bool { guard !is_closed else { return false } guard let originRelayURL else { return str.withCString { cstr in return ndb_process_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0 } } return str.withCString { cstr in return originRelayURL.withCString { originRelayCString in let meta = UnsafeMutablePointer.allocate(capacity: 1) defer { meta.deallocate() } ndb_ingest_meta_init(meta, 0, originRelayCString) return ndb_process_event_with(ndb.ndb, cstr, Int32(str.utf8.count), meta) != 0 } } } func process_events(_ str: String) -> Bool { guard !is_closed else { return false } return str.withCString { cstr in return ndb_process_events(ndb.ndb, cstr, str.utf8.count) != 0 } } func search_profile(_ search: String, limit: Int, txn: NdbTxn) -> [Pubkey] { var pks = Array() return search.withCString { q in var s = ndb_search() guard ndb_search_profile(&txn.txn, &s, q) != 0 else { return pks } defer { ndb_search_profile_end(&s) } pks.append(Pubkey(Data(bytes: &s.key.pointee.id.0, count: 32))) var n = limit while n > 0 { guard ndb_search_profile_next(&s) != 0 else { return pks } pks.append(Pubkey(Data(bytes: &s.key.pointee.id.0, count: 32))) n -= 1 } return pks } } // MARK: NdbFilter queries and subscriptions /// Safe wrapper around the `ndb_query` C function /// - Parameters: /// - txn: Database transaction /// - filters: Array of NdbFilter objects /// - maxResults: Maximum number of results to return /// - Returns: Array of note keys matching the filters /// - Throws: NdbStreamError if the query fails func query(with txn: NdbTxn, filters: [NdbFilter], maxResults: Int) throws(NdbStreamError) -> [NoteKey] { guard !self.is_closed else { throw .ndbClosed } let filtersPointer = UnsafeMutablePointer.allocate(capacity: filters.count) defer { filtersPointer.deallocate() } for (index, ndbFilter) in filters.enumerated() { filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter } let count = UnsafeMutablePointer.allocate(capacity: 1) defer { count.deallocate() } let results = UnsafeMutablePointer.allocate(capacity: maxResults) defer { results.deallocate() } guard !self.is_closed else { throw .ndbClosed } guard ndb_query(&txn.txn, filtersPointer, Int32(filters.count), results, Int32(maxResults), count) == 1 else { throw NdbStreamError.initialQueryFailed } var noteIds: [NoteKey] = [] for i in 0.. AsyncStream { return AsyncStream { continuation in // Allocate filters pointer - will be deallocated when subscription ends // Cannot use `defer` to deallocate `filtersPointer` because it needs to remain valid for the lifetime of the subscription, which extends beyond this block's scope. let filtersPointer = UnsafeMutablePointer.allocate(capacity: filters.count) for (index, ndbFilter) in filters.enumerated() { filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter } var streaming = true var subid: UInt64 = 0 var terminationStarted = false // Set up termination handler continuation.onTermination = { @Sendable _ in guard !terminationStarted else { return } // Avoid race conditions between two termination closures terminationStarted = true Log.debug("ndb_wait: stream: Terminated early", for: .ndb) streaming = false // Clean up resources on early termination if subid != 0 { ndb_unsubscribe(self.ndb.ndb, subid) Task { await self.unsetCallback(subscriptionId: subid) } } filtersPointer.deallocate() } if !streaming { return } // Set up subscription subid = ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count)) // Set the subscription callback Task { await self.setCallback(for: subid, callback: { noteKey in continuation.yield(.event(noteKey)) }) } // Update termination handler to include subscription cleanup continuation.onTermination = { @Sendable _ in guard !terminationStarted else { return } // Avoid race conditions between two termination closures terminationStarted = true Log.debug("ndb_wait: stream: Terminated early", for: .ndb) streaming = false ndb_unsubscribe(self.ndb.ndb, subid) Task { await self.unsetCallback(subscriptionId: subid) } filtersPointer.deallocate() } } } func subscribe(filters: [NdbFilter], maxSimultaneousResults: Int = 1000) throws(NdbStreamError) -> AsyncStream { guard !self.is_closed else { throw .ndbClosed } // Fetch initial results guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction } do { try Task.checkCancellation() } catch { throw .cancelled } // Use our safe wrapper instead of direct C function call let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults) do { try Task.checkCancellation() } catch { throw .cancelled } // Create a subscription for new events let newEventsStream = ndbSubscribe(filters: filters) // Create a cascading stream that combines initial results with new events return AsyncStream { continuation in // Stream all results already present in the database for noteId in noteIds { if Task.isCancelled { return } continuation.yield(.event(noteId)) } // Indicate this is the end of the results currently present in the database continuation.yield(.eose) // Create a task to forward events from the subscription stream let forwardingTask = Task { for await item in newEventsStream { try Task.checkCancellation() continuation.yield(item) } continuation.finish() } // Handle termination by canceling the forwarding task continuation.onTermination = { @Sendable _ in forwardingTask.cancel() } } } private func waitWithoutTimeout(for noteId: NoteId) async throws(NdbLookupError) -> NdbTxn? { do { for try await item in try self.subscribe(filters: [NostrFilter(ids: [noteId])]) { switch item { case .eose: continue case .event(let noteKey): guard let txn = NdbTxn(ndb: self) else { throw NdbLookupError.cannotOpenTransaction } guard let note = self.lookup_note_by_key_with_txn(noteKey, txn: txn) else { throw NdbLookupError.internalInconsistency } if note.id == noteId { Log.debug("ndb wait: %d has matching id %s. Returning transaction", for: .ndb, noteKey, noteId.hex()) return NdbTxn.pure(ndb: self, val: note) } } } } catch { if let error = error as? NdbStreamError { throw NdbLookupError.streamError(error) } else if let error = error as? NdbLookupError { throw error } else { throw .internalInconsistency } } return nil } func waitFor(noteId: NoteId, timeout: TimeInterval = 10) async throws(NdbLookupError) -> NdbTxn? { do { return try await withCheckedThrowingContinuation({ continuation in var done = false let waitTask = Task { do { Log.debug("ndb_wait: Waiting for %s", for: .ndb, noteId.hex()) let result = try await self.waitWithoutTimeout(for: noteId) if !done { Log.debug("ndb_wait: Found %s", for: .ndb, noteId.hex()) continuation.resume(returning: result) done = true } } catch { if Task.isCancelled { return // the timeout task will handle throwing the timeout error } if !done { Log.debug("ndb_wait: Error on %s: %s", for: .ndb, noteId.hex(), error.localizedDescription) continuation.resume(throwing: error) done = true } } } let timeoutTask = Task { try await Task.sleep(for: .seconds(Int(timeout))) if !done { Log.debug("ndb_wait: Timeout on %s. Cancelling wait task…", for: .ndb, noteId.hex()) done = true print("ndb_wait: throwing timeout error") continuation.resume(throwing: NdbLookupError.timeout) } waitTask.cancel() } }) } catch { if let error = error as? NdbLookupError { throw error } else { throw .internalInconsistency } } } /// Determines if a given note was seen on a specific relay URL func was(noteKey: NoteKey, seenOn relayUrl: String, txn: SafeNdbTxn<()>? = nil) throws -> Bool { guard let txn = txn ?? SafeNdbTxn.new(on: self) else { throw NdbLookupError.cannotOpenTransaction } return relayUrl.withCString({ relayCString in return ndb_note_seen_on_relay(&txn.txn, noteKey, relayCString) == 1 }) } /// Determines if a given note was seen on any of the listed relay URLs func was(noteKey: NoteKey, seenOnAnyOf relayUrls: [String], txn: SafeNdbTxn<()>? = nil) throws -> Bool { guard let txn = txn ?? SafeNdbTxn.new(on: self) else { throw NdbLookupError.cannotOpenTransaction } for relayUrl in relayUrls { if try self.was(noteKey: noteKey, seenOn: relayUrl, txn: txn) { return true } } return false } // MARK: Internal ndb callback interfaces internal func setCallback(for subscriptionId: UInt64, callback: @escaping (NoteKey) -> Void) async { await self.callbackHandler.set(callback: callback, for: subscriptionId) } internal func unsetCallback(subscriptionId: UInt64) async { await self.callbackHandler.unset(subid: subscriptionId) } // MARK: Helpers enum Errors: Error { case cannot_find_db_path case db_file_migration_error } // MARK: Deinitialization deinit { print("txn: Ndb de-init") self.close() } } // MARK: - Extensions and helper structures and functions extension Ndb { /// A class that is used to handle callbacks from nostrdb /// /// This is a separate class from `Ndb` because it simplifies the initialization logic actor CallbackHandler { /// Holds the ndb instance in the C codebase. Should be shared with `Ndb` var ndb: ndb_t? = nil /// A map from nostrdb subscription ids to callbacks var subscriptionCallbackMap: [UInt64: (NoteKey) -> Void] = [:] func set(callback: @escaping (NoteKey) -> Void, for subid: UInt64) { subscriptionCallbackMap[subid] = callback } func unset(subid: UInt64) { subscriptionCallbackMap[subid] = nil } func set(ndb: ndb_t?) { self.ndb = ndb } /// Handles callbacks from nostrdb subscriptions, and routes them to the correct callback func handleSubscriptionCallback(subId: UInt64, maxCapacity: Int32 = 1000) { if let callback = subscriptionCallbackMap[subId] { let result = UnsafeMutablePointer.allocate(capacity: Int(maxCapacity)) defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks if let ndb { let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity) for i in 0...fromOpaque(ctx).takeUnretainedValue() Task { await handler.handleSubscriptionCallback(subId: subid) } } #if DEBUG func getDebugCheckedRoot(byteBuffer: inout ByteBuffer) throws -> T { return getRoot(byteBuffer: &byteBuffer) } #else func getDebugCheckedRoot(byteBuffer: inout ByteBuffer) throws -> T { return getRoot(byteBuffer: &byteBuffer) } #endif func remove_file_prefix(_ str: String) -> String { return str.replacingOccurrences(of: "file://", with: "") }