Files
damus/nostrdb/Ndb.swift
Daniel D’Aquino 795fce1b65 Add storage usage stats settings view
This commit implements a new Storage settings view that displays storage
usage statistics for NostrDB, snapshot database, and Kingfisher image cache.

Key features:
- Interactive pie chart visualization (iOS 17+) with tap-to-select functionality
- Pull-to-refresh gesture to recalculate storage
- Categorized list showing each storage type with size and percentage
- Total storage sum displayed at bottom
- Conditional compilation for iOS 16/17+ compatibility
- All calculations run on background thread to avoid blocking main thread
- NostrDB storage breakdown

Changelog-Added: Storage usage statistics view in Settings
Changelog-Changed: Moved clear cache button to storage settings
Closes: https://github.com/damus-io/damus/issues/3649
Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
2026-02-25 15:45:37 -08:00

1319 lines
56 KiB
Swift

//
// Ndb.swift
// damus
//
// Created by William Casarin on 2023-08-25.
//
import Foundation
import OSLog
import Synchronization
fileprivate let APPLICATION_GROUP_IDENTIFIER = "group.com.damus"
enum NdbSearchOrder {
case oldest_first
case newest_first
}
enum DatabaseError: Error {
case failed_open
var errorDescription: String? {
switch self {
case .failed_open:
return "Failed to open database"
}
}
}
class Ndb {
var ndb: ndb_t
let path: String?
let owns_db: Bool
var generation: Int
private var closed: Bool
private var callbackHandler: Ndb.CallbackHandler
private let ndbAccessLock: Ndb.UseLockProtocol = initLock()
private static let DEFAULT_WRITER_SCRATCH_SIZE: Int32 = 2097152; // 2mb scratch size for the writer thread, it should match with the one specified in nostrdb.c
var is_closed: Bool {
self.closed || self.ndb.ndb == nil
}
static func safemode() -> Ndb? {
guard let path = db_path else { return nil }
// delete the database and start fresh
if Self.db_file_exists(path: path) {
let file_manager = FileManager.default
for db_file in db_files {
try? file_manager.removeItem(atPath: "\(path)/\(db_file)")
}
}
guard let ndb = Ndb(path: path) else {
return nil
}
return ndb
}
static var db_path: String? {
guard let path = FileManager.default.urls(for: .documentDirectory, in: .userDomainMask).first?.absoluteString else {
return nil
}
return remove_file_prefix(path)
}
// Shared app group container retained for legacy installs that still host the database there.
static private var legacy_db_path: String? {
guard let containerURL = FileManager.default.containerURL(forSecurityApplicationGroupIdentifier: APPLICATION_GROUP_IDENTIFIER) else {
return nil
}
return remove_file_prefix(containerURL.absoluteString)
}
// DB read-only snapshot in the shared container so that extensions can get access to recent NostrDB data to enhance UX.
static var snapshot_db_path: String? {
guard let containerURL = FileManager.default.containerURL(forSecurityApplicationGroupIdentifier: APPLICATION_GROUP_IDENTIFIER) else {
return nil
}
return remove_file_prefix(containerURL.appendingPathComponent("snapshot", conformingTo: .directory).absoluteString)
}
static let main_db_file_name: String = "data.mdb"
static let db_files: [String] = ["data.mdb", "lock.mdb"]
static var empty: Ndb {
print("txn: NOSTRDB EMPTY")
return Ndb(ndb: ndb_t(ndb: nil))
}
static func open(path: String? = nil, owns_db_file: Bool = true, callbackHandler: Ndb.CallbackHandler) -> ndb_t? {
var ndb_p: OpaquePointer? = nil
let ingest_threads: Int32 = 4
var mapsize: Int = 1024 * 1024 * 1024 * 32
if path == nil && owns_db_file {
// `nil` path indicates the default path will be used.
// The default path changed over time, so migrate the database to the new location if needed
do {
try Self.migrate_db_location_if_needed()
}
catch {
// If it fails to migrate, the app can still run without serious consequences. Log instead.
Log.error("Error migrating NostrDB to new file container", for: .storage)
}
}
// The path should be, in order of priority:
// 1. The path specified by the caller
// 2. If not specified, use a default path. The default path depends:
// a. If the process owns the db file, `Ndb.db_path` is the default.
// b. If the process does not own the db file, a read-only snapshot file (`Ndb.snapshot_db_path`) is used.
guard let path = path.map(remove_file_prefix) ?? (owns_db_file ? Ndb.db_path : Ndb.snapshot_db_path) else {
return nil
}
guard owns_db_file || Self.db_file_exists(path: path) else {
return nil // If the caller claims to not own the DB file, and the DB files do not exist, then we should not initialize Ndb
}
let ok = path.withCString { testdir in
var ok = false
while !ok && mapsize > 1024 * 1024 * 700 {
var cfg = ndb_config(flags: 0, ingester_threads: ingest_threads, writer_scratch_buffer_size: DEFAULT_WRITER_SCRATCH_SIZE, mapsize: mapsize, filter_context: nil, ingest_filter: nil, sub_cb_ctx: nil, sub_cb: nil)
// Here we hook up the global callback function for subscription callbacks.
// We do an "unretained" pass here because the lifetime of the callback handler is larger than the lifetime of the nostrdb monitor in the C code.
// The NostrDB monitor that makes the callbacks should in theory _never_ outlive the callback handler.
//
// This means that:
// - for as long as nostrdb is running, its parent Ndb instance will be alive, keeping the callback handler alive.
// - when the Ndb instance is deinitialized and the callback handler comes down with it the `deinit` function will destroy the nostrdb monitor, preventing it from accessing freed memory.
//
// Therefore, we do not need to increase reference count to callbackHandler. The tightly coupled lifetimes will ensure that it is always alive when the ndb_monitor is alive.
let ctx: UnsafeMutableRawPointer = Unmanaged.passUnretained(callbackHandler).toOpaque()
ndb_config_set_subscription_callback(&cfg, subscription_callback, ctx)
let res = ndb_init(&ndb_p, testdir, &cfg);
ok = res != 0;
if !ok {
Log.error("ndb_init failed: %d, reducing mapsize from %d to %d", for: .storage, res, mapsize, mapsize / 2)
mapsize /= 2
}
}
return ok
}
if !ok {
return nil
}
let ndb_instance = ndb_t(ndb: ndb_p)
Task { await callbackHandler.set(ndb: ndb_instance) }
return ndb_instance
}
init?(path: String? = nil, owns_db_file: Bool = true) {
let callbackHandler = Ndb.CallbackHandler()
guard let db = Self.open(path: path, owns_db_file: owns_db_file, callbackHandler: callbackHandler) else {
return nil
}
self.generation = 0
self.path = path
self.owns_db = owns_db_file
self.ndb = db
self.closed = false
self.callbackHandler = callbackHandler
self.ndbAccessLock.markNdbOpen()
}
static func migrate_db_location_if_needed(db_path: String? = nil, legacy_path: String? = nil) throws {
let db_path = db_path ?? Self.db_path
let legacy_path = legacy_path ?? Self.legacy_db_path
guard let db_path, let legacy_path else {
throw Errors.cannot_find_db_path
}
// Determine which location holds the freshest database copy and ensure it resides in the private container.
let fileManager = FileManager.default
let private_db_file_exists = Self.db_file_exists(path: db_path)
let legacy_db_file_exists = Self.db_file_exists(path: legacy_path)
guard private_db_file_exists || legacy_db_file_exists else { return }
guard let latest_path = Self.latestDatabasePath(primaryPath: db_path,
legacyPath: legacy_path,
fileManager: fileManager) else { return }
guard latest_path != db_path else {
// Desired path is already the latest path. No need to migrate.
if legacy_db_file_exists {
// Legacy db file still exists for some reason. To save space, delete this old copy
Log.info("Deleting legacy NostrDB files to save storage space…", for: .storage)
do {
try db_files.forEach { db_file in
let legacyFileURL = URL(fileURLWithPath: "\(legacy_path)/\(db_file)")
if fileManager.fileExists(atPath: legacyFileURL.path) {
try fileManager.removeItem(at: legacyFileURL)
}
}
Log.info("Legacy NostrDB files successfully deleted", for: .storage)
} catch {
Log.error("Failed to delete legacy NostrDB files: %@", for: .storage, String(describing: error))
}
}
return
}
Log.info("Migrating NostrDB files to the private container…", for: .storage)
do {
try fileManager.createDirectory(atPath: db_path, withIntermediateDirectories: true)
try db_files.forEach { db_file in
let sourceURL = URL(fileURLWithPath: "\(legacy_path)/\(db_file)")
let destinationURL = URL(fileURLWithPath: "\(db_path)/\(db_file)")
if db_file != self.main_db_file_name && !fileManager.fileExists(atPath: sourceURL.path) {
// A non-essential db file does not exist at the source, there is nothing to move. Move on to the next file
return
}
if fileManager.fileExists(atPath: destinationURL.path) {
// Use replaceItemAt for atomic replacement
_ = try fileManager.replaceItemAt(destinationURL, withItemAt: sourceURL, backupItemName: nil, options: [.usingNewMetadataOnly])
} else {
// If destination doesn't exist, just move it
try fileManager.moveItem(at: sourceURL, to: destinationURL)
}
}
Log.info("NostrDB files successfully migrated to the private container", for: .storage)
} catch {
Log.error("Failed to migrate NostrDB files: %@", for: .storage, String(describing: error))
throw Errors.db_file_migration_error
}
}
private static func db_file_exists(path: String) -> Bool {
return FileManager.default.fileExists(atPath: "\(path)/\(Self.main_db_file_name)")
}
/// Returns the path whose `data.mdb` file was modified most recently.
private static func latestDatabasePath(primaryPath: String,
legacyPath: String?,
fileManager: FileManager) -> String? {
guard let legacyPath else { return primaryPath }
guard let legacyDate = Self.lastModifiedDate(for: legacyPath, fileManager: fileManager) else { return primaryPath }
guard let primaryDate = Self.lastModifiedDate(for: primaryPath, fileManager: fileManager) else { return legacyPath }
return primaryDate > legacyDate ? primaryPath : legacyPath
}
private static func lastModifiedDate(for path: String, fileManager: FileManager) -> Date? {
let dataFilePath = "\(path)/data.mdb"
guard fileManager.fileExists(atPath: dataFilePath),
let attributes = try? fileManager.attributesOfItem(atPath: dataFilePath),
let modificationDate = attributes[.modificationDate] as? Date else {
return nil
}
return modificationDate
}
init(ndb: ndb_t) {
self.ndb = ndb
self.generation = 0
self.path = nil
self.owns_db = true
self.closed = false
// This simple initialization will cause subscriptions not to be ever called. Probably fine because this initializer is used only for empty example ndb instances.
self.callbackHandler = Ndb.CallbackHandler()
}
/// Mark NostrDB as "closed" without actually closing it.
/// Useful when shutting down tasks that use NostrDB while avoiding new tasks from using it.
func markClosed() {
self.closed = true
}
func close() {
guard !self.is_closed else { return }
self.closed = true
try! self.ndbAccessLock.waitUntilNdbCanClose(thenClose: {
print("txn: CLOSING NOSTRDB")
ndb_destroy(self.ndb.ndb)
self.generation += 1
print("txn: NOSTRDB CLOSED")
return false
}, maxTimeout: .milliseconds(2000))
}
func reopen() -> Bool {
guard self.is_closed,
let db = Self.open(path: self.path, owns_db_file: self.owns_db, callbackHandler: self.callbackHandler) else {
return false
}
print("txn: NOSTRDB REOPENED (gen \(generation))")
self.ndb = db // Set the new DB before marking it as open to prevent access to the old DB
self.closed = false
self.ndbAccessLock.markNdbOpen()
return true
}
/// Makes a copy of the database in a separate location
///
/// This uses `mdb_env_copy2` which creates a consistent snapshot without blocking writers for long periods.
func snapshot(path: String) throws {
enum SnapshotError: Error {
case mdbOperationError(errno: Int32)
}
try withNdb({
try path.withCString({ pathCString in
let rc = ndb_snapshot(self.ndb.ndb, pathCString, UInt32(0))
guard rc == 0 else {
throw SnapshotError.mdbOperationError(errno: rc)
}
})
})
}
// MARK: Thread safety mechanisms
// Use these for all externally accessible methods that interact with the nostrdb database to prevent race conditions with app lifecycle events (i.e. NostrDB opening and closing)
internal func withNdb<T>(_ useFunction: () throws -> T, maxWaitTimeout: DispatchTimeInterval = .milliseconds(500)) throws -> T {
guard !self.is_closed else { throw NdbStreamError.ndbClosed }
return try self.ndbAccessLock.keepNdbOpen(during: {
// Double-check things to avoid TOCTOU race conditions
guard !self.is_closed else { throw NdbStreamError.ndbClosed }
return try useFunction()
}, maxWaitTimeout: maxWaitTimeout)
}
// MARK: Lookup and query functions
// GH_3245 TODO: This is a low level call, make it hidden from outside Ndb
internal func lookup_blocks_by_key_with_txn(_ key: NoteKey, txn: RawNdbTxnAccessible) -> NdbBlockGroup.BlocksMetadata? {
guard let blocks = ndb_get_blocks_by_key(self.ndb.ndb, &txn.txn, key) else {
return nil
}
return NdbBlockGroup.BlocksMetadata(ptr: blocks)
}
func lookup_blocks_by_key<T>(_ key: NoteKey, borrow lendingFunction: (_: borrowing NdbBlockGroup.BlocksMetadata?) throws -> T) throws -> T {
return try withNdb({
let txn = SafeNdbTxn<NdbBlockGroup.BlocksMetadata?>.new(on: self) { txn in
lookup_blocks_by_key_with_txn(key, txn: txn)
}
guard let txn else {
return try lendingFunction(nil)
}
return try lendingFunction(txn.val)
})
}
private func lookup_note_by_key_with_txn<Y>(_ key: NoteKey, txn: NdbTxn<Y>) -> NdbNote? {
var size: Int = 0
guard let note_p = ndb_get_note_by_key(&txn.txn, key, &size) else {
return nil
}
let ptr = ndb_note_ptr(ptr: note_p)
return NdbNote(note: ptr, size: size, owned: false, key: key)
}
func text_search(query: String, limit: Int = 128, order: NdbSearchOrder = .newest_first) throws -> [NoteKey] {
return try withNdb({
guard let txn = NdbTxn(ndb: self) else { return [] }
var results = ndb_text_search_results()
let res = query.withCString { q in
let order = order == .newest_first ? NDB_ORDER_DESCENDING : NDB_ORDER_ASCENDING
var config = ndb_text_search_config(order: order, limit: Int32(limit))
return ndb_text_search(&txn.txn, q, &results, &config)
}
if res == 0 {
return []
}
var note_ids = [NoteKey]()
for i in 0..<results.num_results {
// seriously wtf
switch i {
case 0: note_ids.append(results.results.0.key.note_id)
case 1: note_ids.append(results.results.1.key.note_id)
case 2: note_ids.append(results.results.2.key.note_id)
case 3: note_ids.append(results.results.3.key.note_id)
case 4: note_ids.append(results.results.4.key.note_id)
case 5: note_ids.append(results.results.5.key.note_id)
case 6: note_ids.append(results.results.6.key.note_id)
case 7: note_ids.append(results.results.7.key.note_id)
case 8: note_ids.append(results.results.8.key.note_id)
case 9: note_ids.append(results.results.9.key.note_id)
case 10: note_ids.append(results.results.10.key.note_id)
case 11: note_ids.append(results.results.11.key.note_id)
case 12: note_ids.append(results.results.12.key.note_id)
case 13: note_ids.append(results.results.13.key.note_id)
case 14: note_ids.append(results.results.14.key.note_id)
case 15: note_ids.append(results.results.15.key.note_id)
case 16: note_ids.append(results.results.16.key.note_id)
case 17: note_ids.append(results.results.17.key.note_id)
case 18: note_ids.append(results.results.18.key.note_id)
case 19: note_ids.append(results.results.19.key.note_id)
case 20: note_ids.append(results.results.20.key.note_id)
case 21: note_ids.append(results.results.21.key.note_id)
case 22: note_ids.append(results.results.22.key.note_id)
case 23: note_ids.append(results.results.23.key.note_id)
case 24: note_ids.append(results.results.24.key.note_id)
case 25: note_ids.append(results.results.25.key.note_id)
case 26: note_ids.append(results.results.26.key.note_id)
case 27: note_ids.append(results.results.27.key.note_id)
case 28: note_ids.append(results.results.28.key.note_id)
case 29: note_ids.append(results.results.29.key.note_id)
case 30: note_ids.append(results.results.30.key.note_id)
case 31: note_ids.append(results.results.31.key.note_id)
case 32: note_ids.append(results.results.32.key.note_id)
case 33: note_ids.append(results.results.33.key.note_id)
case 34: note_ids.append(results.results.34.key.note_id)
case 35: note_ids.append(results.results.35.key.note_id)
case 36: note_ids.append(results.results.36.key.note_id)
case 37: note_ids.append(results.results.37.key.note_id)
case 38: note_ids.append(results.results.38.key.note_id)
case 39: note_ids.append(results.results.39.key.note_id)
case 40: note_ids.append(results.results.40.key.note_id)
case 41: note_ids.append(results.results.41.key.note_id)
case 42: note_ids.append(results.results.42.key.note_id)
case 43: note_ids.append(results.results.43.key.note_id)
case 44: note_ids.append(results.results.44.key.note_id)
case 45: note_ids.append(results.results.45.key.note_id)
case 46: note_ids.append(results.results.46.key.note_id)
case 47: note_ids.append(results.results.47.key.note_id)
case 48: note_ids.append(results.results.48.key.note_id)
case 49: note_ids.append(results.results.49.key.note_id)
case 50: note_ids.append(results.results.50.key.note_id)
case 51: note_ids.append(results.results.51.key.note_id)
case 52: note_ids.append(results.results.52.key.note_id)
case 53: note_ids.append(results.results.53.key.note_id)
case 54: note_ids.append(results.results.54.key.note_id)
case 55: note_ids.append(results.results.55.key.note_id)
case 56: note_ids.append(results.results.56.key.note_id)
case 57: note_ids.append(results.results.57.key.note_id)
case 58: note_ids.append(results.results.58.key.note_id)
case 59: note_ids.append(results.results.59.key.note_id)
case 60: note_ids.append(results.results.60.key.note_id)
case 61: note_ids.append(results.results.61.key.note_id)
case 62: note_ids.append(results.results.62.key.note_id)
case 63: note_ids.append(results.results.63.key.note_id)
case 64: note_ids.append(results.results.64.key.note_id)
case 65: note_ids.append(results.results.65.key.note_id)
case 66: note_ids.append(results.results.66.key.note_id)
case 67: note_ids.append(results.results.67.key.note_id)
case 68: note_ids.append(results.results.68.key.note_id)
case 69: note_ids.append(results.results.69.key.note_id)
case 70: note_ids.append(results.results.70.key.note_id)
case 71: note_ids.append(results.results.71.key.note_id)
case 72: note_ids.append(results.results.72.key.note_id)
case 73: note_ids.append(results.results.73.key.note_id)
case 74: note_ids.append(results.results.74.key.note_id)
case 75: note_ids.append(results.results.75.key.note_id)
case 76: note_ids.append(results.results.76.key.note_id)
case 77: note_ids.append(results.results.77.key.note_id)
case 78: note_ids.append(results.results.78.key.note_id)
case 79: note_ids.append(results.results.79.key.note_id)
case 80: note_ids.append(results.results.80.key.note_id)
case 81: note_ids.append(results.results.81.key.note_id)
case 82: note_ids.append(results.results.82.key.note_id)
case 83: note_ids.append(results.results.83.key.note_id)
case 84: note_ids.append(results.results.84.key.note_id)
case 85: note_ids.append(results.results.85.key.note_id)
case 86: note_ids.append(results.results.86.key.note_id)
case 87: note_ids.append(results.results.87.key.note_id)
case 88: note_ids.append(results.results.88.key.note_id)
case 89: note_ids.append(results.results.89.key.note_id)
case 90: note_ids.append(results.results.90.key.note_id)
case 91: note_ids.append(results.results.91.key.note_id)
case 92: note_ids.append(results.results.92.key.note_id)
case 93: note_ids.append(results.results.93.key.note_id)
case 94: note_ids.append(results.results.94.key.note_id)
case 95: note_ids.append(results.results.95.key.note_id)
case 96: note_ids.append(results.results.96.key.note_id)
case 97: note_ids.append(results.results.97.key.note_id)
case 98: note_ids.append(results.results.98.key.note_id)
case 99: note_ids.append(results.results.99.key.note_id)
case 100: note_ids.append(results.results.100.key.note_id)
case 101: note_ids.append(results.results.101.key.note_id)
case 102: note_ids.append(results.results.102.key.note_id)
case 103: note_ids.append(results.results.103.key.note_id)
case 104: note_ids.append(results.results.104.key.note_id)
case 105: note_ids.append(results.results.105.key.note_id)
case 106: note_ids.append(results.results.106.key.note_id)
case 107: note_ids.append(results.results.107.key.note_id)
case 108: note_ids.append(results.results.108.key.note_id)
case 109: note_ids.append(results.results.109.key.note_id)
case 110: note_ids.append(results.results.110.key.note_id)
case 111: note_ids.append(results.results.111.key.note_id)
case 112: note_ids.append(results.results.112.key.note_id)
case 113: note_ids.append(results.results.113.key.note_id)
case 114: note_ids.append(results.results.114.key.note_id)
case 115: note_ids.append(results.results.115.key.note_id)
case 116: note_ids.append(results.results.116.key.note_id)
case 117: note_ids.append(results.results.117.key.note_id)
case 118: note_ids.append(results.results.118.key.note_id)
case 119: note_ids.append(results.results.119.key.note_id)
case 120: note_ids.append(results.results.120.key.note_id)
case 121: note_ids.append(results.results.121.key.note_id)
case 122: note_ids.append(results.results.122.key.note_id)
case 123: note_ids.append(results.results.123.key.note_id)
case 124: note_ids.append(results.results.124.key.note_id)
case 125: note_ids.append(results.results.125.key.note_id)
case 126: note_ids.append(results.results.126.key.note_id)
case 127: note_ids.append(results.results.127.key.note_id)
default:
break
}
}
return note_ids
})
}
func lookup_note_by_key<T>(_ key: NoteKey, borrow lendingFunction: (_: borrowing UnownedNdbNote?) throws -> T) throws -> T {
return try withNdb({
let txn = NdbTxn(ndb: self) { txn in
lookup_note_by_key_with_txn(key, txn: txn)
}
guard let rawNote = txn?.unsafeUnownedValue else { return try lendingFunction(nil) }
let unownedNote = UnownedNdbNote(rawNote)
return try lendingFunction(.some(unownedNote))
})
}
func lookup_note_by_key_and_copy(_ key: NoteKey) throws -> NdbNote? {
return try withNdb({
return try lookup_note_by_key(key, borrow: { maybeUnownedNote -> NdbNote? in
switch maybeUnownedNote {
case .none: return nil
case .some(let unownedNote): return unownedNote.toOwned()
}
})
})
}
private func lookup_profile_by_key_inner(_ key: ProfileKey, txn: RawNdbTxnAccessible) -> ProfileRecord? {
var size: Int = 0
guard let profile_p = ndb_get_profile_by_key(&txn.txn, key, &size) else {
return nil
}
return profile_flatbuf_to_record(ptr: profile_p, size: size, key: key)
}
private func profile_flatbuf_to_record(ptr: UnsafeMutableRawPointer, size: Int, key: UInt64) -> ProfileRecord? {
do {
var buf = ByteBuffer(assumingMemoryBound: ptr, capacity: size)
let rec: NdbProfileRecord = try getDebugCheckedRoot(byteBuffer: &buf)
return ProfileRecord(data: rec, key: key)
} catch {
// Handle error appropriately
print("UNUSUAL: \(error)")
return nil
}
}
private func lookup_note_with_txn_inner<Y>(id: NoteId, txn: NdbTxn<Y>) -> NdbNote? {
return id.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NdbNote? in
var key: UInt64 = 0
var size: Int = 0
guard let baseAddress = ptr.baseAddress,
let note_p = ndb_get_note_by_id(&txn.txn, baseAddress, &size, &key) else {
return nil
}
let ptr = ndb_note_ptr(ptr: note_p)
return NdbNote(note: ptr, size: size, owned: false, key: key)
}
}
private func lookup_profile_with_txn_inner(pubkey: Pubkey, txn: some RawNdbTxnAccessible) -> ProfileRecord? {
var record: ProfileRecord? = nil
pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) in
var size: Int = 0
var key: UInt64 = 0
guard let baseAddress = ptr.baseAddress,
let profile_p = ndb_get_profile_by_pubkey(&txn.txn, baseAddress, &size, &key)
else {
return
}
record = profile_flatbuf_to_record(ptr: profile_p, size: size, key: key)
}
return record
}
private func lookup_profile_by_key_with_txn(key: ProfileKey, txn: RawNdbTxnAccessible) -> ProfileRecord? {
lookup_profile_by_key_inner(key, txn: txn)
}
func lookup_profile_by_key<T>(key: ProfileKey, borrow lendingFunction: (_: borrowing ProfileRecord?) throws -> T) throws -> T {
return try withNdb({
let txn = SafeNdbTxn<ProfileRecord?>.new(on: self) { txn in
return lookup_profile_by_key_inner(key, txn: txn)
}
guard let txn else { return try lendingFunction(nil) }
return try lendingFunction(txn.val)
})
}
private func lookup_note_with_txn<Y>(id: NoteId, txn: NdbTxn<Y>) -> NdbNote? {
lookup_note_with_txn_inner(id: id, txn: txn)
}
func lookup_profile_key(_ pubkey: Pubkey) throws -> ProfileKey? {
return try withNdb({
guard let txn = NdbTxn(ndb: self, with: { txn in
lookup_profile_key_with_txn(pubkey, txn: txn)
}) else {
return nil
}
return txn.value
})
}
private func lookup_profile_key_with_txn<Y>(_ pubkey: Pubkey, txn: NdbTxn<Y>) -> ProfileKey? {
return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NoteKey? in
guard let p = ptr.baseAddress else { return nil }
let r = ndb_get_profilekey_by_pubkey(&txn.txn, p)
if r == 0 {
return nil
}
return r
}
}
// GH_3245 TODO: This is a low level call, make it hidden from outside Ndb
internal func lookup_note_key_with_txn(_ id: NoteId, txn: some RawNdbTxnAccessible) -> NoteKey? {
guard !closed else { return nil }
return id.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> NoteKey? in
guard let p = ptr.baseAddress else {
return nil
}
let r = ndb_get_notekey_by_id(&txn.txn, p)
if r == 0 {
return nil
}
return r
}
}
func lookup_note_key(_ id: NoteId) throws -> NoteKey? {
return try withNdb({
guard let txn = NdbTxn(ndb: self, with: { txn in
lookup_note_key_with_txn(id, txn: txn)
}) else {
return nil
}
return txn.value
})
}
func lookup_note<T>(_ id: NoteId, borrow lendingFunction: (_: borrowing UnownedNdbNote?) throws -> T) throws -> T {
return try withNdb({
let txn = NdbTxn(ndb: self) { txn in
lookup_note_with_txn_inner(id: id, txn: txn)
}
guard let rawNote = txn?.unsafeUnownedValue else { return try lendingFunction(nil) }
return try lendingFunction(UnownedNdbNote(rawNote))
})
}
func lookup_note_and_copy(_ id: NoteId) throws -> NdbNote? {
return try self.lookup_note(id, borrow: { unownedNote in
return unownedNote?.toOwned()
})
}
func lookup_profile<T>(_ pubkey: Pubkey, borrow lendingFunction: (_: borrowing ProfileRecord?) throws -> T) throws -> T {
return try withNdb({
let txn = SafeNdbTxn<ProfileRecord?>.new(on: self) { txn in
lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn)
}
guard let txn else { return try lendingFunction(nil) }
return try lendingFunction(txn.val)
})
}
func lookup_profile_lnurl(_ pubkey: Pubkey) throws -> String? {
return try lookup_profile(pubkey, borrow: { pr in
switch pr {
case .none: return nil
case .some(let pr): return pr.lnurl
}
})
}
func lookup_profile_and_copy(_ pubkey: Pubkey) throws -> Profile? {
return try self.lookup_profile(pubkey, borrow: { pr in
switch pr {
case .some(let pr): return pr.profile
case .none: return nil
}
})
}
private func lookup_profile_with_txn<Y>(_ pubkey: Pubkey, txn: NdbTxn<Y>) -> ProfileRecord? {
lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn)
}
func process_client_event(_ str: String) -> Bool {
return (try? withNdb({
return str.withCString { cstr in
return ndb_process_client_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0
}
})) ?? false
}
func write_profile_last_fetched(pubkey: Pubkey, fetched_at: UInt64) throws {
return try withNdb({
let _ = pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> () in
guard let p = ptr.baseAddress else { return }
ndb_write_last_profile_fetch(ndb.ndb, p, fetched_at)
}
})
}
private func read_profile_last_fetched<Y>(txn: NdbTxn<Y>, pubkey: Pubkey) -> UInt64? {
guard !closed else { return nil }
return pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> UInt64? in
guard let p = ptr.baseAddress else { return nil }
let res = ndb_read_last_profile_fetch(&txn.txn, p)
if res == 0 {
return nil
}
return res
}
}
func read_profile_last_fetched(pubkey: Pubkey) throws -> UInt64? {
return try withNdb({
var last_fetched: UInt64? = nil
let _ = NdbTxn(ndb: self) { txn in
last_fetched = read_profile_last_fetched(txn: txn, pubkey: pubkey)
}
return last_fetched
})
}
func process_event(_ str: String, originRelayURL: String? = nil) -> Bool {
let response = try? withNdb({
guard !is_closed else { return false }
guard let originRelayURL else {
return str.withCString { cstr in
return ndb_process_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0
}
}
return str.withCString { cstr in
return originRelayURL.withCString { originRelayCString in
let meta = UnsafeMutablePointer<ndb_ingest_meta>.allocate(capacity: 1)
defer { meta.deallocate() }
ndb_ingest_meta_init(meta, 0, originRelayCString)
return ndb_process_event_with(ndb.ndb, cstr, Int32(str.utf8.count), meta) != 0
}
}
})
return response ?? false
}
func process_events(_ str: String) -> Bool {
let response = try? withNdb({
return str.withCString { cstr in
return ndb_process_events(ndb.ndb, cstr, str.utf8.count) != 0
}
})
return response ?? false
}
func search_profile(_ search: String, limit: Int) throws -> [Pubkey] {
return try withNdb({
guard let txn = NdbTxn<()>.init(ndb: self) else { return [] }
return search_profile(search, limit: limit, txn: txn)
})
}
private func search_profile<Y>(_ search: String, limit: Int, txn: NdbTxn<Y>) -> [Pubkey] {
var pks = Array<Pubkey>()
return search.withCString { q in
var s = ndb_search()
guard ndb_search_profile(&txn.txn, &s, q) != 0 else {
return pks
}
defer { ndb_search_profile_end(&s) }
pks.append(Pubkey(Data(bytes: &s.key.pointee.id.0, count: 32)))
var n = limit
while n > 0 {
guard ndb_search_profile_next(&s) != 0 else {
return pks
}
pks.append(Pubkey(Data(bytes: &s.key.pointee.id.0, count: 32)))
n -= 1
}
return pks
}
}
// MARK: NdbFilter queries and subscriptions
func query(filters: [NdbFilter], maxResults: Int) throws -> [NoteKey] {
return try withNdb({
guard let txn = NdbTxn(ndb: self) else { return [] }
return try query(with: txn, filters: filters, maxResults: maxResults)
})
}
/// Safe wrapper around the `ndb_query` C function
/// - Parameters:
/// - txn: Database transaction
/// - filters: Array of NdbFilter objects
/// - maxResults: Maximum number of results to return
/// - Returns: Array of note keys matching the filters
/// - Throws: NdbStreamError if the query fails
private func query<Y>(with txn: NdbTxn<Y>, filters: [NdbFilter], maxResults: Int) throws -> [NoteKey] {
let filtersPointer = UnsafeMutablePointer<ndb_filter>.allocate(capacity: filters.count)
defer { filtersPointer.deallocate() }
for (index, ndbFilter) in filters.enumerated() {
filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter
}
let count = UnsafeMutablePointer<Int32>.allocate(capacity: 1)
defer { count.deallocate() }
let results = UnsafeMutablePointer<ndb_query_result>.allocate(capacity: maxResults)
defer { results.deallocate() }
guard ndb_query(&txn.txn, filtersPointer, Int32(filters.count), results, Int32(maxResults), count) == 1 else {
throw NdbStreamError.initialQueryFailed
}
var noteIds: [NoteKey] = []
for i in 0..<count.pointee {
noteIds.append(results.advanced(by: Int(i)).pointee.note_id)
}
return noteIds
}
/// Safe wrapper around `ndb_subscribe` that handles all pointer management
/// - Parameters:
/// - filters: Array of NdbFilter objects
/// - Returns: AsyncStream of NoteKey events for new matches only
private func ndbSubscribe(filters: [NdbFilter]) -> AsyncStream<NoteKey> {
return AsyncStream<NoteKey> { continuation in
// Allocate filters pointer - will be deallocated when subscription ends
// Cannot use `defer` to deallocate `filtersPointer` because it needs to remain valid for the lifetime of the subscription, which extends beyond this block's scope.
let filtersPointer = UnsafeMutablePointer<ndb_filter>.allocate(capacity: filters.count)
for (index, ndbFilter) in filters.enumerated() {
filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter
}
var streaming = true
var subid: UInt64 = 0
var terminationStarted = false
// Set up termination handler
continuation.onTermination = { @Sendable _ in
guard !terminationStarted else { return } // Avoid race conditions between two termination closures
terminationStarted = true
Log.debug("ndb_wait: stream: Terminated early", for: .ndb)
streaming = false
// Clean up resources on early termination
if subid != 0 {
ndb_unsubscribe(self.ndb.ndb, subid)
Task { await self.unsetContinuation(subscriptionId: subid) }
}
filtersPointer.deallocate()
}
if !streaming {
return
}
// Set up subscription
guard let subid = try? withNdb({ ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count)) }) else { return }
// We are setting the continuation after issuing the subscription call.
// This won't cause lost notes because if any notes get issued before registering
// the continuation they will get queued by `Ndb.CallbackHandler`
let continuationSetupTask = Task {
await self.setContinuation(for: subid, continuation: continuation)
}
// Update termination handler to include subscription cleanup
continuation.onTermination = { @Sendable _ in
guard !terminationStarted else { return } // Avoid race conditions between two termination closures
terminationStarted = true
Log.debug("ndb_wait: stream: Terminated early", for: .ndb)
streaming = false
continuationSetupTask.cancel()
Task { await self.unsetContinuation(subscriptionId: subid) }
filtersPointer.deallocate()
guard !self.is_closed else { return } // Double-check Ndb is open before sending unsubscribe
ndb_unsubscribe(self.ndb.ndb, subid)
}
}
}
func subscribe(filters: [NdbFilter], maxSimultaneousResults: Int = 1000) throws -> AsyncStream<StreamItem> {
guard !self.is_closed else { throw NdbStreamError.ndbClosed }
do { try Task.checkCancellation() } catch { throw NdbStreamError.cancelled }
var noteIds: [NoteKey] = []
let newEventsStream = try withNdb({
// CRITICAL: Create the subscription FIRST before querying to avoid race condition
// This ensures that any events indexed after subscription but before query won't be missed
let newEventsStream = ndbSubscribe(filters: filters)
// Now fetch initial results after subscription is registered
guard let txn = NdbTxn(ndb: self) else { throw NdbStreamError.cannotOpenTransaction }
// Use our safe wrapper instead of direct C function call
noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults)
do { try Task.checkCancellation() } catch { throw NdbStreamError.cancelled }
return newEventsStream
})
// Create a cascading stream that combines initial results with new events
return AsyncStream<StreamItem> { continuation in
// Stream all results already present in the database
for noteId in noteIds {
if Task.isCancelled { return }
continuation.yield(.event(noteId))
}
// Indicate this is the end of the results currently present in the database
continuation.yield(.eose)
// Create a task to forward events from the subscription stream
let forwardingTask = Task {
for await noteKey in newEventsStream {
if Task.isCancelled { break }
continuation.yield(.event(noteKey))
}
continuation.finish()
}
// Handle termination by canceling the forwarding task
continuation.onTermination = { @Sendable _ in
forwardingTask.cancel()
}
}
}
private func waitWithoutTimeout(for noteId: NoteId) async throws(NdbLookupError) -> NdbTxn<NdbNote>? {
do {
for try await item in try self.subscribe(filters: [NostrFilter(ids: [noteId])]) {
switch item {
case .eose:
continue
case .event(let noteKey):
guard let txn = NdbTxn(ndb: self) else { throw NdbLookupError.cannotOpenTransaction }
guard let note = self.lookup_note_by_key_with_txn(noteKey, txn: txn) else { throw NdbLookupError.internalInconsistency }
if note.id == noteId {
Log.debug("ndb wait: %d has matching id %s. Returning transaction", for: .ndb, noteKey, noteId.hex())
return NdbTxn<NdbNote>.pure(ndb: self, val: note)
}
}
}
}
catch {
if let error = error as? NdbStreamError { throw NdbLookupError.streamError(error) }
else if let error = error as? NdbLookupError { throw error }
else { throw .internalInconsistency }
}
return nil
}
/// Determines if a given note was seen on a specific relay URL
private func was(noteKey: NoteKey, seenOn relayUrl: String, txn: SafeNdbTxn<()>?) throws -> Bool {
guard let txn = txn ?? SafeNdbTxn.new(on: self) else { throw NdbLookupError.cannotOpenTransaction }
return relayUrl.withCString({ relayCString in
return ndb_note_seen_on_relay(&txn.txn, noteKey, relayCString) == 1
})
}
func was(noteKey: NoteKey, seenOn relayUrl: String) throws -> Bool {
return try withNdb({
return try self.was(noteKey: noteKey, seenOn: relayUrl, txn: nil)
})
}
/// Determines if a given note was seen on any of the listed relay URLs
private func was(noteKey: NoteKey, seenOnAnyOf relayUrls: [String], txn: SafeNdbTxn<()>? = nil) throws -> Bool {
guard let txn = txn ?? SafeNdbTxn.new(on: self) else { throw NdbLookupError.cannotOpenTransaction }
for relayUrl in relayUrls {
if try self.was(noteKey: noteKey, seenOn: relayUrl, txn: txn) {
return true
}
}
return false
}
/// Determines if a given note was seen on any of the listed relay URLs
func was(noteKey: NoteKey, seenOnAnyOf relayUrls: [String]) throws -> Bool {
return try withNdb({
return try self.was(noteKey: noteKey, seenOnAnyOf: relayUrls, txn: nil)
})
}
// MARK: Internal ndb callback interfaces
internal func setContinuation(for subscriptionId: UInt64, continuation: AsyncStream<NoteKey>.Continuation) async {
await self.callbackHandler.set(continuation: continuation, for: subscriptionId)
}
internal func unsetContinuation(subscriptionId: UInt64) async {
await self.callbackHandler.unset(subid: subscriptionId)
}
// MARK: Helpers
enum Errors: Error {
case cannot_find_db_path
case db_file_migration_error
}
// MARK: Deinitialization
deinit {
print("txn: Ndb de-init")
self.close()
}
}
// MARK: - Extensions and helper structures and functions
extension Ndb {
/// A class that is used to handle callbacks from nostrdb
///
/// This is a separate class from `Ndb` because it simplifies the initialization logic
actor CallbackHandler {
/// Holds the ndb instance in the C codebase. Should be shared with `Ndb`
var ndb: ndb_t? = nil
/// A map from nostrdb subscription ids to stream continuations, which allows publishing note keys to active listeners
var subscriptionContinuationMap: [UInt64: AsyncStream<NoteKey>.Continuation] = [:]
/// A map from nostrdb subscription ids to queued note keys (for when there are no active listeners)
var subscriptionQueueMap: [UInt64: [NoteKey]] = [:]
/// Maximum number of items to queue per subscription when no one is listening
let maxQueueItemsPerSubscription: Int = 2000
func set(continuation: AsyncStream<NoteKey>.Continuation, for subid: UInt64) {
// Flush any queued items to the new continuation
if let queuedItems = subscriptionQueueMap[subid] {
for noteKey in queuedItems {
continuation.yield(noteKey)
}
subscriptionQueueMap[subid] = nil
}
subscriptionContinuationMap[subid] = continuation
}
func unset(subid: UInt64) {
subscriptionContinuationMap[subid] = nil
subscriptionQueueMap[subid] = nil
}
func set(ndb: ndb_t?) {
self.ndb = ndb
}
/// Handles callbacks from nostrdb subscriptions, and routes them to the correct continuation or queue
func handleSubscriptionCallback(subId: UInt64, maxCapacity: Int32 = 1000) {
let result = UnsafeMutablePointer<UInt64>.allocate(capacity: Int(maxCapacity))
defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks
guard let ndb else { return }
let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity)
for i in 0..<numberOfNotes {
let noteKey = result.advanced(by: Int(i)).pointee
if let continuation = subscriptionContinuationMap[subId] {
// Send directly to the active listener stream
continuation.yield(noteKey)
} else {
// No one is listening, queue it for later
var queue = subscriptionQueueMap[subId] ?? []
// Ensure queue stays within the desired size
while queue.count >= maxQueueItemsPerSubscription {
queue.removeFirst()
}
queue.append(noteKey)
subscriptionQueueMap[subId] = queue
}
}
}
}
/// An item that comes out of a subscription stream
enum StreamItem {
/// End of currently stored events
case eose
/// An event in NostrDB available at the given note key
case event(NoteKey)
}
/// An error that may happen during nostrdb streaming
enum NdbStreamError: Error {
case cannotOpenTransaction
case cannotConvertFilter(any Error)
case initialQueryFailed
case timeout
case cancelled
case ndbClosed
}
/// An error that may happen when looking something up
enum NdbLookupError: Error {
case cannotOpenTransaction
case streamError(NdbStreamError)
case internalInconsistency
case timeout
case notFound
}
enum OperationError: Error {
case genericError
}
}
extension Ndb {
/// Get detailed storage statistics for this database
///
/// This method calls the C `ndb_stat` function to retrieve per-database
/// storage statistics from the underlying LMDB storage. Each database
/// (notes, profiles, indices, etc.) is reported with its key and value sizes.
///
/// Any unaccounted space between the sum of database stats and the physical
/// file size is reported as "Other Data".
///
/// - Parameter physicalSize: The physical file size in bytes from the filesystem
/// - Returns: NdbStats with detailed per-database breakdown, or nil if stat collection fails
func getStats(physicalSize: UInt64) -> NdbStats? {
// All of this must be done under withNdb to avoid races with close()/ndb_destroy
let copiedStats: ([NdbDatabaseStats], UInt64)? = try? withNdb({
var stat = ndb_stat()
// Call C ndb_stat function
let result = ndb_stat(self.ndb.ndb, &stat)
guard result != 0 else {
Log.error("ndb_stat failed", for: .storage)
return nil
}
var databaseStats: [NdbDatabaseStats] = []
var accountedSize: UInt64 = 0
// Extract per-database stats from stat.dbs array
withUnsafePointer(to: &stat.dbs) { dbsPtr in
let dbsBuffer = UnsafeRawPointer(dbsPtr).assumingMemoryBound(to: ndb_stat_counts.self)
for dbIndex in 0..<Int(NDB_DBS.rawValue) {
let dbStat = dbsBuffer[dbIndex]
// Skip databases with no data
guard dbStat.key_size > 0 || dbStat.value_size > 0 else { continue }
// Get database type from index
let database = NdbDatabase(fromIndex: dbIndex)
let dbStats = NdbDatabaseStats(
database: database,
keySize: UInt64(dbStat.key_size),
valueSize: UInt64(dbStat.value_size)
)
databaseStats.append(dbStats)
accountedSize += dbStats.totalSize
}
}
return (databaseStats, accountedSize)
})
guard let (databaseStatsRaw, accountedSize) = copiedStats else { return nil }
var databaseStats = databaseStatsRaw
// Add "Other Data" for any unaccounted space
if physicalSize > accountedSize {
let otherSize = physicalSize - accountedSize
databaseStats.append(NdbDatabaseStats(
database: .other,
keySize: 0,
valueSize: otherSize
))
}
// Sort by total size descending to show largest databases first
databaseStats.sort { $0.totalSize > $1.totalSize }
return NdbStats(databaseStats: databaseStats)
}
}
/// This callback "trampoline" function will be called when new notes arrive for NostrDB subscriptions.
///
/// This is needed as a separate global function in order to allow us to pass it to the C code as a callback (We can't pass native Swift fuctions directly as callbacks).
///
/// - Parameters:
/// - ctx: A pointer to a context object setup during initialization. This allows this function to "find" the correct place to call. MUST be a pointer to a `CallbackHandler`, otherwise this will trigger a crash
/// - subid: The NostrDB subscription ID, which identifies the subscription that is being called back
@_cdecl("subscription_callback")
public func subscription_callback(ctx: UnsafeMutableRawPointer?, subid: UInt64) {
guard let ctx else { return }
let handler = Unmanaged<Ndb.CallbackHandler>.fromOpaque(ctx).takeUnretainedValue()
Task {
await handler.handleSubscriptionCallback(subId: subid)
}
}
#if DEBUG
func getDebugCheckedRoot<T: FlatBufferObject>(byteBuffer: inout ByteBuffer) throws -> T {
return getRoot(byteBuffer: &byteBuffer)
}
#else
func getDebugCheckedRoot<T: FlatBufferObject>(byteBuffer: inout ByteBuffer) throws -> T {
return getRoot(byteBuffer: &byteBuffer)
}
#endif
func remove_file_prefix(_ str: String) -> String {
return str.replacingOccurrences(of: "file://", with: "")
}
// MARK: - NostrDB Storage Statistics
/// NostrDB database types corresponding to the ndb_dbs C enum
enum NdbDatabase: Int, Hashable, CaseIterable, Identifiable {
case note = 0 // NDB_DB_NOTE
case meta = 1 // NDB_DB_META
case profile = 2 // NDB_DB_PROFILE
case noteId = 3 // NDB_DB_NOTE_ID
case profileKey = 4 // NDB_DB_PROFILE_PK
case ndbMeta = 5 // NDB_DB_NDB_META
case profileSearch = 6 // NDB_DB_PROFILE_SEARCH
case profileLastFetch = 7 // NDB_DB_PROFILE_LAST_FETCH
case noteKind = 8 // NDB_DB_NOTE_KIND
case noteText = 9 // NDB_DB_NOTE_TEXT
case noteBlocks = 10 // NDB_DB_NOTE_BLOCKS
case noteTags = 11 // NDB_DB_NOTE_TAGS
case notePubkey = 12 // NDB_DB_NOTE_PUBKEY
case notePubkeyKind = 13 // NDB_DB_NOTE_PUBKEY_KIND
case noteRelayKind = 14 // NDB_DB_NOTE_RELAY_KIND
case noteRelays = 15 // NDB_DB_NOTE_RELAYS
case other // For unaccounted data
var id: String {
return String(self.rawValue)
}
/// Database index matching the C ndb_dbs enum
var index: Int {
return self.rawValue
}
/// Initialize from database index (matching ndb_dbs C enum order)
init(fromIndex index: Int) {
if let db = NdbDatabase(rawValue: index) {
self = db
} else {
self = .other
}
}
}
/// Per-database storage statistics from NostrDB
struct NdbDatabaseStats: Hashable {
/// Database type
let database: NdbDatabase
/// Total key bytes for this database
let keySize: UInt64
/// Total value bytes for this database
let valueSize: UInt64
/// Total storage used by this database (keys + values)
var totalSize: UInt64 {
return keySize + valueSize
}
}
/// Detailed NostrDB storage statistics with per-database breakdown
struct NdbStats: Hashable {
/// Per-database breakdown of storage (notes, profiles, indices, etc.)
let databaseStats: [NdbDatabaseStats]
/// Total storage across all databases
var totalSize: UInt64 {
return databaseStats.reduce(0) { $0 + $1.totalSize }
}
}