Add sync mechanism to prevent background crashes and fix ndb reopen order
This adds a sync mechanism in Ndb.swift to coordinate certain usage of nostrdb.c calls and the need to close nostrdb due to app lifecycle requirements. Furthermore, it fixes the order of operations when re-opening NostrDB, to avoid race conditions where a query uses an older Ndb generation. This sync mechanism allows multiple queries to happen simultaneously (from the Swift-side), while preventing ndb from simultaneously closing during such usages. It also does that while keeping the Ndb interface sync and nonisolated, which keeps the API easy to use from Swift/SwiftUI and allows for parallel operations to occur. If Swift Actors were to be used (e.g. creating an NdbActor), the Ndb.swift interface would change in such a way that it would propagate the need for several changes throughout the codebase, including loading logic in some ViewModels. Furthermore, it would likely decrease performance by forcing Ndb.swift operations to run sequentially when they could run in parallel. Changelog-Fixed: Fixed crashes that happened when the app went into background mode Closes: https://github.com/damus-io/damus/issues/3245 Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
+332
-270
@@ -7,6 +7,7 @@
|
||||
|
||||
import Foundation
|
||||
import OSLog
|
||||
import Synchronization
|
||||
|
||||
fileprivate let APPLICATION_GROUP_IDENTIFIER = "group.com.damus"
|
||||
|
||||
@@ -34,6 +35,7 @@ class Ndb {
|
||||
var generation: Int
|
||||
private var closed: Bool
|
||||
private var callbackHandler: Ndb.CallbackHandler
|
||||
private let ndbAccessLock: Ndb.UseLockProtocol = initLock()
|
||||
|
||||
private static let DEFAULT_WRITER_SCRATCH_SIZE: Int32 = 2097152; // 2mb scratch size for the writer thread, it should match with the one specified in nostrdb.c
|
||||
|
||||
@@ -158,6 +160,7 @@ class Ndb {
|
||||
self.ndb = db
|
||||
self.closed = false
|
||||
self.callbackHandler = callbackHandler
|
||||
self.ndbAccessLock.markNdbOpen()
|
||||
}
|
||||
|
||||
private static func migrate_db_location_if_needed() throws {
|
||||
@@ -206,7 +209,7 @@ class Ndb {
|
||||
// This simple initialization will cause subscriptions not to be ever called. Probably fine because this initializer is used only for empty example ndb instances.
|
||||
self.callbackHandler = Ndb.CallbackHandler()
|
||||
}
|
||||
|
||||
|
||||
/// Mark NostrDB as "closed" without actually closing it.
|
||||
/// Useful when shutting down tasks that use NostrDB while avoiding new tasks from using it.
|
||||
func markClosed() {
|
||||
@@ -216,10 +219,13 @@ class Ndb {
|
||||
func close() {
|
||||
guard !self.is_closed else { return }
|
||||
self.closed = true
|
||||
print("txn: CLOSING NOSTRDB")
|
||||
ndb_destroy(self.ndb.ndb)
|
||||
self.generation += 1
|
||||
print("txn: NOSTRDB CLOSED")
|
||||
try! self.ndbAccessLock.waitUntilNdbCanClose(thenClose: {
|
||||
print("txn: CLOSING NOSTRDB")
|
||||
ndb_destroy(self.ndb.ndb)
|
||||
self.generation += 1
|
||||
print("txn: NOSTRDB CLOSED")
|
||||
return false
|
||||
}, maxTimeout: .milliseconds(2000))
|
||||
}
|
||||
|
||||
func reopen() -> Bool {
|
||||
@@ -229,12 +235,29 @@ class Ndb {
|
||||
}
|
||||
|
||||
print("txn: NOSTRDB REOPENED (gen \(generation))")
|
||||
|
||||
|
||||
self.ndb = db // Set the new DB before marking it as open to prevent access to the old DB
|
||||
self.closed = false
|
||||
self.ndb = db
|
||||
self.ndbAccessLock.markNdbOpen()
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
// MARK: Thread safety mechanisms
|
||||
// Use these for all externally accessible methods that interact with the nostrdb database to prevent race conditions with app lifecycle events (i.e. NostrDB opening and closing)
|
||||
|
||||
internal func withNdb<T>(_ useFunction: () throws -> T, maxWaitTimeout: DispatchTimeInterval = .milliseconds(500)) throws -> T {
|
||||
guard !self.is_closed else { throw NdbStreamError.ndbClosed }
|
||||
return try self.ndbAccessLock.keepNdbOpen(during: {
|
||||
// Double-check things to avoid TOCTOU race conditions
|
||||
guard !self.is_closed else { throw NdbStreamError.ndbClosed }
|
||||
return try useFunction()
|
||||
}, maxWaitTimeout: maxWaitTimeout)
|
||||
}
|
||||
|
||||
|
||||
// MARK: Lookup and query functions
|
||||
|
||||
// GH_3245 TODO: This is a low level call, make it hidden from outside Ndb
|
||||
internal func lookup_blocks_by_key_with_txn(_ key: NoteKey, txn: RawNdbTxnAccessible) -> NdbBlockGroup.BlocksMetadata? {
|
||||
guard let blocks = ndb_get_blocks_by_key(self.ndb.ndb, &txn.txn, key) else {
|
||||
@@ -244,14 +267,16 @@ class Ndb {
|
||||
return NdbBlockGroup.BlocksMetadata(ptr: blocks)
|
||||
}
|
||||
|
||||
func lookup_blocks_by_key<T>(_ key: NoteKey, borrow lendingFunction: (_: borrowing NdbBlockGroup.BlocksMetadata?) throws -> T) rethrows -> T {
|
||||
let txn = SafeNdbTxn<NdbBlockGroup.BlocksMetadata?>.new(on: self) { txn in
|
||||
lookup_blocks_by_key_with_txn(key, txn: txn)
|
||||
}
|
||||
guard let txn else {
|
||||
return try lendingFunction(nil)
|
||||
}
|
||||
return try lendingFunction(txn.val)
|
||||
func lookup_blocks_by_key<T>(_ key: NoteKey, borrow lendingFunction: (_: borrowing NdbBlockGroup.BlocksMetadata?) throws -> T) throws -> T {
|
||||
return try withNdb({
|
||||
let txn = SafeNdbTxn<NdbBlockGroup.BlocksMetadata?>.new(on: self) { txn in
|
||||
lookup_blocks_by_key_with_txn(key, txn: txn)
|
||||
}
|
||||
guard let txn else {
|
||||
return try lendingFunction(nil)
|
||||
}
|
||||
return try lendingFunction(txn.val)
|
||||
})
|
||||
}
|
||||
|
||||
private func lookup_note_by_key_with_txn<Y>(_ key: NoteKey, txn: NdbTxn<Y>) -> NdbNote? {
|
||||
@@ -263,174 +288,180 @@ class Ndb {
|
||||
return NdbNote(note: ptr, size: size, owned: false, key: key)
|
||||
}
|
||||
|
||||
func text_search(query: String, limit: Int = 128, order: NdbSearchOrder = .newest_first) -> [NoteKey] {
|
||||
guard let txn = NdbTxn(ndb: self) else { return [] }
|
||||
var results = ndb_text_search_results()
|
||||
let res = query.withCString { q in
|
||||
let order = order == .newest_first ? NDB_ORDER_DESCENDING : NDB_ORDER_ASCENDING
|
||||
var config = ndb_text_search_config(order: order, limit: Int32(limit))
|
||||
return ndb_text_search(&txn.txn, q, &results, &config)
|
||||
}
|
||||
|
||||
if res == 0 {
|
||||
return []
|
||||
}
|
||||
|
||||
var note_ids = [NoteKey]()
|
||||
for i in 0..<results.num_results {
|
||||
// seriously wtf
|
||||
switch i {
|
||||
case 0: note_ids.append(results.results.0.key.note_id)
|
||||
case 1: note_ids.append(results.results.1.key.note_id)
|
||||
case 2: note_ids.append(results.results.2.key.note_id)
|
||||
case 3: note_ids.append(results.results.3.key.note_id)
|
||||
case 4: note_ids.append(results.results.4.key.note_id)
|
||||
case 5: note_ids.append(results.results.5.key.note_id)
|
||||
case 6: note_ids.append(results.results.6.key.note_id)
|
||||
case 7: note_ids.append(results.results.7.key.note_id)
|
||||
case 8: note_ids.append(results.results.8.key.note_id)
|
||||
case 9: note_ids.append(results.results.9.key.note_id)
|
||||
case 10: note_ids.append(results.results.10.key.note_id)
|
||||
case 11: note_ids.append(results.results.11.key.note_id)
|
||||
case 12: note_ids.append(results.results.12.key.note_id)
|
||||
case 13: note_ids.append(results.results.13.key.note_id)
|
||||
case 14: note_ids.append(results.results.14.key.note_id)
|
||||
case 15: note_ids.append(results.results.15.key.note_id)
|
||||
case 16: note_ids.append(results.results.16.key.note_id)
|
||||
case 17: note_ids.append(results.results.17.key.note_id)
|
||||
case 18: note_ids.append(results.results.18.key.note_id)
|
||||
case 19: note_ids.append(results.results.19.key.note_id)
|
||||
case 20: note_ids.append(results.results.20.key.note_id)
|
||||
case 21: note_ids.append(results.results.21.key.note_id)
|
||||
case 22: note_ids.append(results.results.22.key.note_id)
|
||||
case 23: note_ids.append(results.results.23.key.note_id)
|
||||
case 24: note_ids.append(results.results.24.key.note_id)
|
||||
case 25: note_ids.append(results.results.25.key.note_id)
|
||||
case 26: note_ids.append(results.results.26.key.note_id)
|
||||
case 27: note_ids.append(results.results.27.key.note_id)
|
||||
case 28: note_ids.append(results.results.28.key.note_id)
|
||||
case 29: note_ids.append(results.results.29.key.note_id)
|
||||
case 30: note_ids.append(results.results.30.key.note_id)
|
||||
case 31: note_ids.append(results.results.31.key.note_id)
|
||||
case 32: note_ids.append(results.results.32.key.note_id)
|
||||
case 33: note_ids.append(results.results.33.key.note_id)
|
||||
case 34: note_ids.append(results.results.34.key.note_id)
|
||||
case 35: note_ids.append(results.results.35.key.note_id)
|
||||
case 36: note_ids.append(results.results.36.key.note_id)
|
||||
case 37: note_ids.append(results.results.37.key.note_id)
|
||||
case 38: note_ids.append(results.results.38.key.note_id)
|
||||
case 39: note_ids.append(results.results.39.key.note_id)
|
||||
case 40: note_ids.append(results.results.40.key.note_id)
|
||||
case 41: note_ids.append(results.results.41.key.note_id)
|
||||
case 42: note_ids.append(results.results.42.key.note_id)
|
||||
case 43: note_ids.append(results.results.43.key.note_id)
|
||||
case 44: note_ids.append(results.results.44.key.note_id)
|
||||
case 45: note_ids.append(results.results.45.key.note_id)
|
||||
case 46: note_ids.append(results.results.46.key.note_id)
|
||||
case 47: note_ids.append(results.results.47.key.note_id)
|
||||
case 48: note_ids.append(results.results.48.key.note_id)
|
||||
case 49: note_ids.append(results.results.49.key.note_id)
|
||||
case 50: note_ids.append(results.results.50.key.note_id)
|
||||
case 51: note_ids.append(results.results.51.key.note_id)
|
||||
case 52: note_ids.append(results.results.52.key.note_id)
|
||||
case 53: note_ids.append(results.results.53.key.note_id)
|
||||
case 54: note_ids.append(results.results.54.key.note_id)
|
||||
case 55: note_ids.append(results.results.55.key.note_id)
|
||||
case 56: note_ids.append(results.results.56.key.note_id)
|
||||
case 57: note_ids.append(results.results.57.key.note_id)
|
||||
case 58: note_ids.append(results.results.58.key.note_id)
|
||||
case 59: note_ids.append(results.results.59.key.note_id)
|
||||
case 60: note_ids.append(results.results.60.key.note_id)
|
||||
case 61: note_ids.append(results.results.61.key.note_id)
|
||||
case 62: note_ids.append(results.results.62.key.note_id)
|
||||
case 63: note_ids.append(results.results.63.key.note_id)
|
||||
case 64: note_ids.append(results.results.64.key.note_id)
|
||||
case 65: note_ids.append(results.results.65.key.note_id)
|
||||
case 66: note_ids.append(results.results.66.key.note_id)
|
||||
case 67: note_ids.append(results.results.67.key.note_id)
|
||||
case 68: note_ids.append(results.results.68.key.note_id)
|
||||
case 69: note_ids.append(results.results.69.key.note_id)
|
||||
case 70: note_ids.append(results.results.70.key.note_id)
|
||||
case 71: note_ids.append(results.results.71.key.note_id)
|
||||
case 72: note_ids.append(results.results.72.key.note_id)
|
||||
case 73: note_ids.append(results.results.73.key.note_id)
|
||||
case 74: note_ids.append(results.results.74.key.note_id)
|
||||
case 75: note_ids.append(results.results.75.key.note_id)
|
||||
case 76: note_ids.append(results.results.76.key.note_id)
|
||||
case 77: note_ids.append(results.results.77.key.note_id)
|
||||
case 78: note_ids.append(results.results.78.key.note_id)
|
||||
case 79: note_ids.append(results.results.79.key.note_id)
|
||||
case 80: note_ids.append(results.results.80.key.note_id)
|
||||
case 81: note_ids.append(results.results.81.key.note_id)
|
||||
case 82: note_ids.append(results.results.82.key.note_id)
|
||||
case 83: note_ids.append(results.results.83.key.note_id)
|
||||
case 84: note_ids.append(results.results.84.key.note_id)
|
||||
case 85: note_ids.append(results.results.85.key.note_id)
|
||||
case 86: note_ids.append(results.results.86.key.note_id)
|
||||
case 87: note_ids.append(results.results.87.key.note_id)
|
||||
case 88: note_ids.append(results.results.88.key.note_id)
|
||||
case 89: note_ids.append(results.results.89.key.note_id)
|
||||
case 90: note_ids.append(results.results.90.key.note_id)
|
||||
case 91: note_ids.append(results.results.91.key.note_id)
|
||||
case 92: note_ids.append(results.results.92.key.note_id)
|
||||
case 93: note_ids.append(results.results.93.key.note_id)
|
||||
case 94: note_ids.append(results.results.94.key.note_id)
|
||||
case 95: note_ids.append(results.results.95.key.note_id)
|
||||
case 96: note_ids.append(results.results.96.key.note_id)
|
||||
case 97: note_ids.append(results.results.97.key.note_id)
|
||||
case 98: note_ids.append(results.results.98.key.note_id)
|
||||
case 99: note_ids.append(results.results.99.key.note_id)
|
||||
case 100: note_ids.append(results.results.100.key.note_id)
|
||||
case 101: note_ids.append(results.results.101.key.note_id)
|
||||
case 102: note_ids.append(results.results.102.key.note_id)
|
||||
case 103: note_ids.append(results.results.103.key.note_id)
|
||||
case 104: note_ids.append(results.results.104.key.note_id)
|
||||
case 105: note_ids.append(results.results.105.key.note_id)
|
||||
case 106: note_ids.append(results.results.106.key.note_id)
|
||||
case 107: note_ids.append(results.results.107.key.note_id)
|
||||
case 108: note_ids.append(results.results.108.key.note_id)
|
||||
case 109: note_ids.append(results.results.109.key.note_id)
|
||||
case 110: note_ids.append(results.results.110.key.note_id)
|
||||
case 111: note_ids.append(results.results.111.key.note_id)
|
||||
case 112: note_ids.append(results.results.112.key.note_id)
|
||||
case 113: note_ids.append(results.results.113.key.note_id)
|
||||
case 114: note_ids.append(results.results.114.key.note_id)
|
||||
case 115: note_ids.append(results.results.115.key.note_id)
|
||||
case 116: note_ids.append(results.results.116.key.note_id)
|
||||
case 117: note_ids.append(results.results.117.key.note_id)
|
||||
case 118: note_ids.append(results.results.118.key.note_id)
|
||||
case 119: note_ids.append(results.results.119.key.note_id)
|
||||
case 120: note_ids.append(results.results.120.key.note_id)
|
||||
case 121: note_ids.append(results.results.121.key.note_id)
|
||||
case 122: note_ids.append(results.results.122.key.note_id)
|
||||
case 123: note_ids.append(results.results.123.key.note_id)
|
||||
case 124: note_ids.append(results.results.124.key.note_id)
|
||||
case 125: note_ids.append(results.results.125.key.note_id)
|
||||
case 126: note_ids.append(results.results.126.key.note_id)
|
||||
case 127: note_ids.append(results.results.127.key.note_id)
|
||||
default:
|
||||
break
|
||||
func text_search(query: String, limit: Int = 128, order: NdbSearchOrder = .newest_first) throws -> [NoteKey] {
|
||||
return try withNdb({
|
||||
guard let txn = NdbTxn(ndb: self) else { return [] }
|
||||
var results = ndb_text_search_results()
|
||||
let res = query.withCString { q in
|
||||
let order = order == .newest_first ? NDB_ORDER_DESCENDING : NDB_ORDER_ASCENDING
|
||||
var config = ndb_text_search_config(order: order, limit: Int32(limit))
|
||||
return ndb_text_search(&txn.txn, q, &results, &config)
|
||||
}
|
||||
}
|
||||
|
||||
return note_ids
|
||||
if res == 0 {
|
||||
return []
|
||||
}
|
||||
|
||||
var note_ids = [NoteKey]()
|
||||
for i in 0..<results.num_results {
|
||||
// seriously wtf
|
||||
switch i {
|
||||
case 0: note_ids.append(results.results.0.key.note_id)
|
||||
case 1: note_ids.append(results.results.1.key.note_id)
|
||||
case 2: note_ids.append(results.results.2.key.note_id)
|
||||
case 3: note_ids.append(results.results.3.key.note_id)
|
||||
case 4: note_ids.append(results.results.4.key.note_id)
|
||||
case 5: note_ids.append(results.results.5.key.note_id)
|
||||
case 6: note_ids.append(results.results.6.key.note_id)
|
||||
case 7: note_ids.append(results.results.7.key.note_id)
|
||||
case 8: note_ids.append(results.results.8.key.note_id)
|
||||
case 9: note_ids.append(results.results.9.key.note_id)
|
||||
case 10: note_ids.append(results.results.10.key.note_id)
|
||||
case 11: note_ids.append(results.results.11.key.note_id)
|
||||
case 12: note_ids.append(results.results.12.key.note_id)
|
||||
case 13: note_ids.append(results.results.13.key.note_id)
|
||||
case 14: note_ids.append(results.results.14.key.note_id)
|
||||
case 15: note_ids.append(results.results.15.key.note_id)
|
||||
case 16: note_ids.append(results.results.16.key.note_id)
|
||||
case 17: note_ids.append(results.results.17.key.note_id)
|
||||
case 18: note_ids.append(results.results.18.key.note_id)
|
||||
case 19: note_ids.append(results.results.19.key.note_id)
|
||||
case 20: note_ids.append(results.results.20.key.note_id)
|
||||
case 21: note_ids.append(results.results.21.key.note_id)
|
||||
case 22: note_ids.append(results.results.22.key.note_id)
|
||||
case 23: note_ids.append(results.results.23.key.note_id)
|
||||
case 24: note_ids.append(results.results.24.key.note_id)
|
||||
case 25: note_ids.append(results.results.25.key.note_id)
|
||||
case 26: note_ids.append(results.results.26.key.note_id)
|
||||
case 27: note_ids.append(results.results.27.key.note_id)
|
||||
case 28: note_ids.append(results.results.28.key.note_id)
|
||||
case 29: note_ids.append(results.results.29.key.note_id)
|
||||
case 30: note_ids.append(results.results.30.key.note_id)
|
||||
case 31: note_ids.append(results.results.31.key.note_id)
|
||||
case 32: note_ids.append(results.results.32.key.note_id)
|
||||
case 33: note_ids.append(results.results.33.key.note_id)
|
||||
case 34: note_ids.append(results.results.34.key.note_id)
|
||||
case 35: note_ids.append(results.results.35.key.note_id)
|
||||
case 36: note_ids.append(results.results.36.key.note_id)
|
||||
case 37: note_ids.append(results.results.37.key.note_id)
|
||||
case 38: note_ids.append(results.results.38.key.note_id)
|
||||
case 39: note_ids.append(results.results.39.key.note_id)
|
||||
case 40: note_ids.append(results.results.40.key.note_id)
|
||||
case 41: note_ids.append(results.results.41.key.note_id)
|
||||
case 42: note_ids.append(results.results.42.key.note_id)
|
||||
case 43: note_ids.append(results.results.43.key.note_id)
|
||||
case 44: note_ids.append(results.results.44.key.note_id)
|
||||
case 45: note_ids.append(results.results.45.key.note_id)
|
||||
case 46: note_ids.append(results.results.46.key.note_id)
|
||||
case 47: note_ids.append(results.results.47.key.note_id)
|
||||
case 48: note_ids.append(results.results.48.key.note_id)
|
||||
case 49: note_ids.append(results.results.49.key.note_id)
|
||||
case 50: note_ids.append(results.results.50.key.note_id)
|
||||
case 51: note_ids.append(results.results.51.key.note_id)
|
||||
case 52: note_ids.append(results.results.52.key.note_id)
|
||||
case 53: note_ids.append(results.results.53.key.note_id)
|
||||
case 54: note_ids.append(results.results.54.key.note_id)
|
||||
case 55: note_ids.append(results.results.55.key.note_id)
|
||||
case 56: note_ids.append(results.results.56.key.note_id)
|
||||
case 57: note_ids.append(results.results.57.key.note_id)
|
||||
case 58: note_ids.append(results.results.58.key.note_id)
|
||||
case 59: note_ids.append(results.results.59.key.note_id)
|
||||
case 60: note_ids.append(results.results.60.key.note_id)
|
||||
case 61: note_ids.append(results.results.61.key.note_id)
|
||||
case 62: note_ids.append(results.results.62.key.note_id)
|
||||
case 63: note_ids.append(results.results.63.key.note_id)
|
||||
case 64: note_ids.append(results.results.64.key.note_id)
|
||||
case 65: note_ids.append(results.results.65.key.note_id)
|
||||
case 66: note_ids.append(results.results.66.key.note_id)
|
||||
case 67: note_ids.append(results.results.67.key.note_id)
|
||||
case 68: note_ids.append(results.results.68.key.note_id)
|
||||
case 69: note_ids.append(results.results.69.key.note_id)
|
||||
case 70: note_ids.append(results.results.70.key.note_id)
|
||||
case 71: note_ids.append(results.results.71.key.note_id)
|
||||
case 72: note_ids.append(results.results.72.key.note_id)
|
||||
case 73: note_ids.append(results.results.73.key.note_id)
|
||||
case 74: note_ids.append(results.results.74.key.note_id)
|
||||
case 75: note_ids.append(results.results.75.key.note_id)
|
||||
case 76: note_ids.append(results.results.76.key.note_id)
|
||||
case 77: note_ids.append(results.results.77.key.note_id)
|
||||
case 78: note_ids.append(results.results.78.key.note_id)
|
||||
case 79: note_ids.append(results.results.79.key.note_id)
|
||||
case 80: note_ids.append(results.results.80.key.note_id)
|
||||
case 81: note_ids.append(results.results.81.key.note_id)
|
||||
case 82: note_ids.append(results.results.82.key.note_id)
|
||||
case 83: note_ids.append(results.results.83.key.note_id)
|
||||
case 84: note_ids.append(results.results.84.key.note_id)
|
||||
case 85: note_ids.append(results.results.85.key.note_id)
|
||||
case 86: note_ids.append(results.results.86.key.note_id)
|
||||
case 87: note_ids.append(results.results.87.key.note_id)
|
||||
case 88: note_ids.append(results.results.88.key.note_id)
|
||||
case 89: note_ids.append(results.results.89.key.note_id)
|
||||
case 90: note_ids.append(results.results.90.key.note_id)
|
||||
case 91: note_ids.append(results.results.91.key.note_id)
|
||||
case 92: note_ids.append(results.results.92.key.note_id)
|
||||
case 93: note_ids.append(results.results.93.key.note_id)
|
||||
case 94: note_ids.append(results.results.94.key.note_id)
|
||||
case 95: note_ids.append(results.results.95.key.note_id)
|
||||
case 96: note_ids.append(results.results.96.key.note_id)
|
||||
case 97: note_ids.append(results.results.97.key.note_id)
|
||||
case 98: note_ids.append(results.results.98.key.note_id)
|
||||
case 99: note_ids.append(results.results.99.key.note_id)
|
||||
case 100: note_ids.append(results.results.100.key.note_id)
|
||||
case 101: note_ids.append(results.results.101.key.note_id)
|
||||
case 102: note_ids.append(results.results.102.key.note_id)
|
||||
case 103: note_ids.append(results.results.103.key.note_id)
|
||||
case 104: note_ids.append(results.results.104.key.note_id)
|
||||
case 105: note_ids.append(results.results.105.key.note_id)
|
||||
case 106: note_ids.append(results.results.106.key.note_id)
|
||||
case 107: note_ids.append(results.results.107.key.note_id)
|
||||
case 108: note_ids.append(results.results.108.key.note_id)
|
||||
case 109: note_ids.append(results.results.109.key.note_id)
|
||||
case 110: note_ids.append(results.results.110.key.note_id)
|
||||
case 111: note_ids.append(results.results.111.key.note_id)
|
||||
case 112: note_ids.append(results.results.112.key.note_id)
|
||||
case 113: note_ids.append(results.results.113.key.note_id)
|
||||
case 114: note_ids.append(results.results.114.key.note_id)
|
||||
case 115: note_ids.append(results.results.115.key.note_id)
|
||||
case 116: note_ids.append(results.results.116.key.note_id)
|
||||
case 117: note_ids.append(results.results.117.key.note_id)
|
||||
case 118: note_ids.append(results.results.118.key.note_id)
|
||||
case 119: note_ids.append(results.results.119.key.note_id)
|
||||
case 120: note_ids.append(results.results.120.key.note_id)
|
||||
case 121: note_ids.append(results.results.121.key.note_id)
|
||||
case 122: note_ids.append(results.results.122.key.note_id)
|
||||
case 123: note_ids.append(results.results.123.key.note_id)
|
||||
case 124: note_ids.append(results.results.124.key.note_id)
|
||||
case 125: note_ids.append(results.results.125.key.note_id)
|
||||
case 126: note_ids.append(results.results.126.key.note_id)
|
||||
case 127: note_ids.append(results.results.127.key.note_id)
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return note_ids
|
||||
})
|
||||
}
|
||||
|
||||
func lookup_note_by_key<T>(_ key: NoteKey, borrow lendingFunction: (_: borrowing UnownedNdbNote?) throws -> T) rethrows -> T {
|
||||
let txn = NdbTxn(ndb: self) { txn in
|
||||
lookup_note_by_key_with_txn(key, txn: txn)
|
||||
}
|
||||
guard let rawNote = txn?.unsafeUnownedValue else { return try lendingFunction(nil) }
|
||||
let unownedNote = UnownedNdbNote(rawNote)
|
||||
return try lendingFunction(.some(unownedNote))
|
||||
func lookup_note_by_key<T>(_ key: NoteKey, borrow lendingFunction: (_: borrowing UnownedNdbNote?) throws -> T) throws -> T {
|
||||
return try withNdb({
|
||||
let txn = NdbTxn(ndb: self) { txn in
|
||||
lookup_note_by_key_with_txn(key, txn: txn)
|
||||
}
|
||||
guard let rawNote = txn?.unsafeUnownedValue else { return try lendingFunction(nil) }
|
||||
let unownedNote = UnownedNdbNote(rawNote)
|
||||
return try lendingFunction(.some(unownedNote))
|
||||
})
|
||||
}
|
||||
|
||||
func lookup_note_by_key_and_copy(_ key: NoteKey) -> NdbNote? {
|
||||
return lookup_note_by_key(key, borrow: { maybeUnownedNote -> NdbNote? in
|
||||
switch maybeUnownedNote {
|
||||
case .none: return nil
|
||||
case .some(let unownedNote): return unownedNote.toOwned()
|
||||
}
|
||||
func lookup_note_by_key_and_copy(_ key: NoteKey) throws -> NdbNote? {
|
||||
return try withNdb({
|
||||
return try lookup_note_by_key(key, borrow: { maybeUnownedNote -> NdbNote? in
|
||||
switch maybeUnownedNote {
|
||||
case .none: return nil
|
||||
case .some(let unownedNote): return unownedNote.toOwned()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -489,26 +520,30 @@ class Ndb {
|
||||
lookup_profile_by_key_inner(key, txn: txn)
|
||||
}
|
||||
|
||||
func lookup_profile_by_key<T>(key: ProfileKey, borrow lendingFunction: (_: borrowing ProfileRecord?) throws -> T) rethrows -> T {
|
||||
let txn = SafeNdbTxn<ProfileRecord?>.new(on: self) { txn in
|
||||
return lookup_profile_by_key_inner(key, txn: txn)
|
||||
}
|
||||
guard let txn else { return try lendingFunction(nil) }
|
||||
return try lendingFunction(txn.val)
|
||||
func lookup_profile_by_key<T>(key: ProfileKey, borrow lendingFunction: (_: borrowing ProfileRecord?) throws -> T) throws -> T {
|
||||
return try withNdb({
|
||||
let txn = SafeNdbTxn<ProfileRecord?>.new(on: self) { txn in
|
||||
return lookup_profile_by_key_inner(key, txn: txn)
|
||||
}
|
||||
guard let txn else { return try lendingFunction(nil) }
|
||||
return try lendingFunction(txn.val)
|
||||
})
|
||||
}
|
||||
|
||||
private func lookup_note_with_txn<Y>(id: NoteId, txn: NdbTxn<Y>) -> NdbNote? {
|
||||
lookup_note_with_txn_inner(id: id, txn: txn)
|
||||
}
|
||||
|
||||
func lookup_profile_key(_ pubkey: Pubkey) -> ProfileKey? {
|
||||
guard let txn = NdbTxn(ndb: self, with: { txn in
|
||||
lookup_profile_key_with_txn(pubkey, txn: txn)
|
||||
}) else {
|
||||
return nil
|
||||
}
|
||||
func lookup_profile_key(_ pubkey: Pubkey) throws -> ProfileKey? {
|
||||
return try withNdb({
|
||||
guard let txn = NdbTxn(ndb: self, with: { txn in
|
||||
lookup_profile_key_with_txn(pubkey, txn: txn)
|
||||
}) else {
|
||||
return nil
|
||||
}
|
||||
|
||||
return txn.value
|
||||
return txn.value
|
||||
})
|
||||
}
|
||||
|
||||
private func lookup_profile_key_with_txn<Y>(_ pubkey: Pubkey, txn: NdbTxn<Y>) -> ProfileKey? {
|
||||
@@ -537,40 +572,46 @@ class Ndb {
|
||||
}
|
||||
}
|
||||
|
||||
func lookup_note_key(_ id: NoteId) -> NoteKey? {
|
||||
guard let txn = NdbTxn(ndb: self, with: { txn in
|
||||
lookup_note_key_with_txn(id, txn: txn)
|
||||
}) else {
|
||||
return nil
|
||||
}
|
||||
func lookup_note_key(_ id: NoteId) throws -> NoteKey? {
|
||||
return try withNdb({
|
||||
guard let txn = NdbTxn(ndb: self, with: { txn in
|
||||
lookup_note_key_with_txn(id, txn: txn)
|
||||
}) else {
|
||||
return nil
|
||||
}
|
||||
|
||||
return txn.value
|
||||
return txn.value
|
||||
})
|
||||
}
|
||||
|
||||
func lookup_note<T>(_ id: NoteId, borrow lendingFunction: (_: borrowing UnownedNdbNote?) throws -> T) rethrows -> T {
|
||||
let txn = NdbTxn(ndb: self) { txn in
|
||||
lookup_note_with_txn_inner(id: id, txn: txn)
|
||||
}
|
||||
guard let rawNote = txn?.unsafeUnownedValue else { return try lendingFunction(nil) }
|
||||
return try lendingFunction(UnownedNdbNote(rawNote))
|
||||
func lookup_note<T>(_ id: NoteId, borrow lendingFunction: (_: borrowing UnownedNdbNote?) throws -> T) throws -> T {
|
||||
return try withNdb({
|
||||
let txn = NdbTxn(ndb: self) { txn in
|
||||
lookup_note_with_txn_inner(id: id, txn: txn)
|
||||
}
|
||||
guard let rawNote = txn?.unsafeUnownedValue else { return try lendingFunction(nil) }
|
||||
return try lendingFunction(UnownedNdbNote(rawNote))
|
||||
})
|
||||
}
|
||||
|
||||
func lookup_note_and_copy(_ id: NoteId) -> NdbNote? {
|
||||
return self.lookup_note(id, borrow: { unownedNote in
|
||||
func lookup_note_and_copy(_ id: NoteId) throws -> NdbNote? {
|
||||
return try self.lookup_note(id, borrow: { unownedNote in
|
||||
return unownedNote?.toOwned()
|
||||
})
|
||||
}
|
||||
|
||||
func lookup_profile<T>(_ pubkey: Pubkey, borrow lendingFunction: (_: borrowing ProfileRecord?) throws -> T) rethrows -> T {
|
||||
let txn = SafeNdbTxn<ProfileRecord?>.new(on: self) { txn in
|
||||
lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn)
|
||||
}
|
||||
guard let txn else { return try lendingFunction(nil) }
|
||||
return try lendingFunction(txn.val)
|
||||
func lookup_profile<T>(_ pubkey: Pubkey, borrow lendingFunction: (_: borrowing ProfileRecord?) throws -> T) throws -> T {
|
||||
return try withNdb({
|
||||
let txn = SafeNdbTxn<ProfileRecord?>.new(on: self) { txn in
|
||||
lookup_profile_with_txn_inner(pubkey: pubkey, txn: txn)
|
||||
}
|
||||
guard let txn else { return try lendingFunction(nil) }
|
||||
return try lendingFunction(txn.val)
|
||||
})
|
||||
}
|
||||
|
||||
func lookup_profile_lnurl(_ pubkey: Pubkey) -> String? {
|
||||
return lookup_profile(pubkey, borrow: { pr in
|
||||
func lookup_profile_lnurl(_ pubkey: Pubkey) throws -> String? {
|
||||
return try lookup_profile(pubkey, borrow: { pr in
|
||||
switch pr {
|
||||
case .none: return nil
|
||||
case .some(let pr): return pr.lnurl
|
||||
@@ -578,8 +619,8 @@ class Ndb {
|
||||
})
|
||||
}
|
||||
|
||||
func lookup_profile_and_copy(_ pubkey: Pubkey) -> Profile? {
|
||||
return self.lookup_profile(pubkey, borrow: { pr in
|
||||
func lookup_profile_and_copy(_ pubkey: Pubkey) throws -> Profile? {
|
||||
return try self.lookup_profile(pubkey, borrow: { pr in
|
||||
switch pr {
|
||||
case .some(let pr): return pr.profile
|
||||
case .none: return nil
|
||||
@@ -592,18 +633,20 @@ class Ndb {
|
||||
}
|
||||
|
||||
func process_client_event(_ str: String) -> Bool {
|
||||
guard !self.is_closed else { return false }
|
||||
return str.withCString { cstr in
|
||||
return ndb_process_client_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0
|
||||
}
|
||||
return (try? withNdb({
|
||||
return str.withCString { cstr in
|
||||
return ndb_process_client_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0
|
||||
}
|
||||
})) ?? false
|
||||
}
|
||||
|
||||
func write_profile_last_fetched(pubkey: Pubkey, fetched_at: UInt64) {
|
||||
guard !closed else { return }
|
||||
let _ = pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> () in
|
||||
guard let p = ptr.baseAddress else { return }
|
||||
ndb_write_last_profile_fetch(ndb.ndb, p, fetched_at)
|
||||
}
|
||||
func write_profile_last_fetched(pubkey: Pubkey, fetched_at: UInt64) throws {
|
||||
return try withNdb({
|
||||
let _ = pubkey.id.withUnsafeBytes { (ptr: UnsafeRawBufferPointer) -> () in
|
||||
guard let p = ptr.baseAddress else { return }
|
||||
ndb_write_last_profile_fetch(ndb.ndb, p, fetched_at)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private func read_profile_last_fetched<Y>(txn: NdbTxn<Y>, pubkey: Pubkey) -> UInt64? {
|
||||
@@ -619,41 +662,50 @@ class Ndb {
|
||||
}
|
||||
}
|
||||
|
||||
func read_profile_last_fetched(pubkey: Pubkey) -> UInt64? {
|
||||
var last_fetched: UInt64? = nil
|
||||
let _ = NdbTxn(ndb: self) { txn in
|
||||
last_fetched = read_profile_last_fetched(txn: txn, pubkey: pubkey)
|
||||
}
|
||||
return last_fetched
|
||||
func read_profile_last_fetched(pubkey: Pubkey) throws -> UInt64? {
|
||||
return try withNdb({
|
||||
var last_fetched: UInt64? = nil
|
||||
let _ = NdbTxn(ndb: self) { txn in
|
||||
last_fetched = read_profile_last_fetched(txn: txn, pubkey: pubkey)
|
||||
}
|
||||
return last_fetched
|
||||
})
|
||||
}
|
||||
|
||||
func process_event(_ str: String, originRelayURL: String? = nil) -> Bool {
|
||||
guard !is_closed else { return false }
|
||||
guard let originRelayURL else {
|
||||
let response = try? withNdb({
|
||||
guard !is_closed else { return false }
|
||||
guard let originRelayURL else {
|
||||
return str.withCString { cstr in
|
||||
return ndb_process_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0
|
||||
}
|
||||
}
|
||||
return str.withCString { cstr in
|
||||
return ndb_process_event(ndb.ndb, cstr, Int32(str.utf8.count)) != 0
|
||||
return originRelayURL.withCString { originRelayCString in
|
||||
let meta = UnsafeMutablePointer<ndb_ingest_meta>.allocate(capacity: 1)
|
||||
defer { meta.deallocate() }
|
||||
ndb_ingest_meta_init(meta, 0, originRelayCString)
|
||||
return ndb_process_event_with(ndb.ndb, cstr, Int32(str.utf8.count), meta) != 0
|
||||
}
|
||||
}
|
||||
}
|
||||
return str.withCString { cstr in
|
||||
return originRelayURL.withCString { originRelayCString in
|
||||
let meta = UnsafeMutablePointer<ndb_ingest_meta>.allocate(capacity: 1)
|
||||
defer { meta.deallocate() }
|
||||
ndb_ingest_meta_init(meta, 0, originRelayCString)
|
||||
return ndb_process_event_with(ndb.ndb, cstr, Int32(str.utf8.count), meta) != 0
|
||||
}
|
||||
}
|
||||
})
|
||||
return response ?? false
|
||||
}
|
||||
|
||||
func process_events(_ str: String) -> Bool {
|
||||
guard !is_closed else { return false }
|
||||
return str.withCString { cstr in
|
||||
return ndb_process_events(ndb.ndb, cstr, str.utf8.count) != 0
|
||||
}
|
||||
let response = try? withNdb({
|
||||
return str.withCString { cstr in
|
||||
return ndb_process_events(ndb.ndb, cstr, str.utf8.count) != 0
|
||||
}
|
||||
})
|
||||
return response ?? false
|
||||
}
|
||||
|
||||
func search_profile(_ search: String, limit: Int) -> [Pubkey] {
|
||||
guard let txn = NdbTxn<()>.init(ndb: self) else { return [] }
|
||||
return search_profile(search, limit: limit, txn: txn)
|
||||
func search_profile(_ search: String, limit: Int) throws -> [Pubkey] {
|
||||
return try withNdb({
|
||||
guard let txn = NdbTxn<()>.init(ndb: self) else { return [] }
|
||||
return search_profile(search, limit: limit, txn: txn)
|
||||
})
|
||||
}
|
||||
|
||||
private func search_profile<Y>(_ search: String, limit: Int, txn: NdbTxn<Y>) -> [Pubkey] {
|
||||
@@ -684,9 +736,11 @@ class Ndb {
|
||||
|
||||
// MARK: NdbFilter queries and subscriptions
|
||||
|
||||
func query(filters: [NdbFilter], maxResults: Int) throws(NdbStreamError) -> [NoteKey] {
|
||||
guard let txn = NdbTxn(ndb: self) else { return [] }
|
||||
return try query(with: txn, filters: filters, maxResults: maxResults)
|
||||
func query(filters: [NdbFilter], maxResults: Int) throws -> [NoteKey] {
|
||||
return try withNdb({
|
||||
guard let txn = NdbTxn(ndb: self) else { return [] }
|
||||
return try query(with: txn, filters: filters, maxResults: maxResults)
|
||||
})
|
||||
}
|
||||
|
||||
/// Safe wrapper around the `ndb_query` C function
|
||||
@@ -696,8 +750,7 @@ class Ndb {
|
||||
/// - maxResults: Maximum number of results to return
|
||||
/// - Returns: Array of note keys matching the filters
|
||||
/// - Throws: NdbStreamError if the query fails
|
||||
private func query<Y>(with txn: NdbTxn<Y>, filters: [NdbFilter], maxResults: Int) throws(NdbStreamError) -> [NoteKey] {
|
||||
guard !self.is_closed else { throw .ndbClosed }
|
||||
private func query<Y>(with txn: NdbTxn<Y>, filters: [NdbFilter], maxResults: Int) throws -> [NoteKey] {
|
||||
let filtersPointer = UnsafeMutablePointer<ndb_filter>.allocate(capacity: filters.count)
|
||||
defer { filtersPointer.deallocate() }
|
||||
|
||||
@@ -711,7 +764,6 @@ class Ndb {
|
||||
let results = UnsafeMutablePointer<ndb_query_result>.allocate(capacity: maxResults)
|
||||
defer { results.deallocate() }
|
||||
|
||||
guard !self.is_closed else { throw .ndbClosed }
|
||||
guard ndb_query(&txn.txn, filtersPointer, Int32(filters.count), results, Int32(maxResults), count) == 1 else {
|
||||
throw NdbStreamError.initialQueryFailed
|
||||
}
|
||||
@@ -760,7 +812,7 @@ class Ndb {
|
||||
}
|
||||
|
||||
// Set up subscription
|
||||
subid = ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count))
|
||||
guard let subid = try? withNdb({ ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count)) }) else { return }
|
||||
|
||||
// We are setting the continuation after issuing the subscription call.
|
||||
// This won't cause lost notes because if any notes get issued before registering
|
||||
@@ -789,17 +841,24 @@ class Ndb {
|
||||
|
||||
do { try Task.checkCancellation() } catch { throw NdbStreamError.cancelled }
|
||||
|
||||
// CRITICAL: Create the subscription FIRST before querying to avoid race condition
|
||||
// This ensures that any events indexed after subscription but before query won't be missed
|
||||
let newEventsStream = ndbSubscribe(filters: filters)
|
||||
var noteIds: [NoteKey] = []
|
||||
|
||||
// Now fetch initial results after subscription is registered
|
||||
guard let txn = NdbTxn(ndb: self) else { throw NdbStreamError.cannotOpenTransaction }
|
||||
|
||||
// Use our safe wrapper instead of direct C function call
|
||||
let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults)
|
||||
|
||||
do { try Task.checkCancellation() } catch { throw NdbStreamError.cancelled }
|
||||
let newEventsStream = try withNdb({
|
||||
|
||||
// CRITICAL: Create the subscription FIRST before querying to avoid race condition
|
||||
// This ensures that any events indexed after subscription but before query won't be missed
|
||||
let newEventsStream = ndbSubscribe(filters: filters)
|
||||
|
||||
// Now fetch initial results after subscription is registered
|
||||
guard let txn = NdbTxn(ndb: self) else { throw NdbStreamError.cannotOpenTransaction }
|
||||
|
||||
// Use our safe wrapper instead of direct C function call
|
||||
noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults)
|
||||
|
||||
do { try Task.checkCancellation() } catch { throw NdbStreamError.cancelled }
|
||||
|
||||
return newEventsStream
|
||||
})
|
||||
|
||||
// Create a cascading stream that combines initial results with new events
|
||||
return AsyncStream<StreamItem> { continuation in
|
||||
@@ -861,7 +920,9 @@ class Ndb {
|
||||
}
|
||||
|
||||
func was(noteKey: NoteKey, seenOn relayUrl: String) throws -> Bool {
|
||||
return try self.was(noteKey: noteKey, seenOn: relayUrl, txn: nil)
|
||||
return try withNdb({
|
||||
return try self.was(noteKey: noteKey, seenOn: relayUrl, txn: nil)
|
||||
})
|
||||
}
|
||||
|
||||
/// Determines if a given note was seen on any of the listed relay URLs
|
||||
@@ -877,7 +938,9 @@ class Ndb {
|
||||
|
||||
/// Determines if a given note was seen on any of the listed relay URLs
|
||||
func was(noteKey: NoteKey, seenOnAnyOf relayUrls: [String]) throws -> Bool {
|
||||
return try self.was(noteKey: noteKey, seenOnAnyOf: relayUrls, txn: nil)
|
||||
return try withNdb({
|
||||
return try self.was(noteKey: noteKey, seenOnAnyOf: relayUrls, txn: nil)
|
||||
})
|
||||
}
|
||||
|
||||
// MARK: Internal ndb callback interfaces
|
||||
@@ -1035,4 +1098,3 @@ func getDebugCheckedRoot<T: FlatBufferObject>(byteBuffer: inout ByteBuffer) thro
|
||||
func remove_file_prefix(_ str: String) -> String {
|
||||
return str.replacingOccurrences(of: "file://", with: "")
|
||||
}
|
||||
|
||||
|
||||
@@ -478,8 +478,8 @@ extension NdbNote {
|
||||
return ThreadReply(tags: self.tags)?.reply.note_id
|
||||
}
|
||||
|
||||
func block_offsets<T>(ndb: Ndb, borrow lendingFunction: (_: borrowing NdbBlockGroup.BlocksMetadata?) throws -> T) rethrows -> T {
|
||||
guard let key = ndb.lookup_note_key(self.id) else { return try lendingFunction(nil) }
|
||||
func block_offsets<T>(ndb: Ndb, borrow lendingFunction: (_: borrowing NdbBlockGroup.BlocksMetadata?) throws -> T) throws -> T {
|
||||
guard let key = try ndb.lookup_note_key(self.id) else { return try lendingFunction(nil) }
|
||||
|
||||
return try ndb.lookup_blocks_by_key(key, borrow: { blocks in
|
||||
return try lendingFunction(blocks)
|
||||
|
||||
+45
-28
@@ -24,6 +24,12 @@ class NdbTxn<T>: RawNdbTxnAccessible {
|
||||
static func pure(ndb: Ndb, val: T) -> NdbTxn<T> {
|
||||
.init(ndb: ndb, txn: ndb_txn(), val: val, generation: ndb.generation, inherited: true, name: "pure_txn")
|
||||
}
|
||||
|
||||
/// Simple helper struct for the init function to avoid compiler errors encountered by using other techniques
|
||||
private struct R {
|
||||
let txn: ndb_txn
|
||||
let generation: Int
|
||||
}
|
||||
|
||||
init?(ndb: Ndb, with: (NdbTxn<T>) -> T = { _ in () }, name: String? = nil) {
|
||||
guard !ndb.is_closed else { return nil }
|
||||
@@ -43,17 +49,18 @@ class NdbTxn<T>: RawNdbTxnAccessible {
|
||||
let new_ref_count = ref_count + 1
|
||||
Thread.current.threadDictionary["ndb_txn_ref_count"] = new_ref_count
|
||||
} else {
|
||||
self.txn = ndb_txn()
|
||||
guard !ndb.is_closed else { return nil }
|
||||
self.generation = ndb.generation
|
||||
#if TXNDEBUG
|
||||
txn_count += 1
|
||||
#endif
|
||||
let ok = ndb_begin_query(ndb.ndb.ndb, &self.txn) != 0
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
self.generation = ndb.generation
|
||||
let result: R? = try? ndb.withNdb({
|
||||
var txn = ndb_txn()
|
||||
#if TXNDEBUG
|
||||
txn_count += 1
|
||||
#endif
|
||||
let ok = ndb_begin_query(ndb.ndb.ndb, &txn) != 0
|
||||
guard ok else { return .none }
|
||||
return .some(R(txn: txn, generation: ndb.generation))
|
||||
}, maxWaitTimeout: .milliseconds(200))
|
||||
guard let result else { return nil }
|
||||
self.txn = result.txn
|
||||
self.generation = result.generation
|
||||
Thread.current.threadDictionary["ndb_txn"] = self.txn
|
||||
Thread.current.threadDictionary["ndb_txn_ref_count"] = 1
|
||||
Thread.current.threadDictionary["txn_generation"] = ndb.generation
|
||||
@@ -97,7 +104,9 @@ class NdbTxn<T>: RawNdbTxnAccessible {
|
||||
Thread.current.threadDictionary["ndb_txn_ref_count"] = new_ref_count
|
||||
assert(new_ref_count >= 0, "NdbTxn reference count should never be below zero")
|
||||
if new_ref_count <= 0 {
|
||||
ndb_end_query(&self.txn)
|
||||
_ = try? ndb.withNdb({
|
||||
ndb_end_query(&self.txn)
|
||||
}, maxWaitTimeout: .milliseconds(200))
|
||||
Thread.current.threadDictionary.removeObject(forKey: "ndb_txn")
|
||||
Thread.current.threadDictionary.removeObject(forKey: "ndb_txn_ref_count")
|
||||
}
|
||||
@@ -156,10 +165,16 @@ class SafeNdbTxn<T: ~Copyable> {
|
||||
.init(ndb: ndb, txn: ndb_txn(), val: val, generation: ndb.generation, inherited: true, name: "pure_txn")
|
||||
}
|
||||
|
||||
/// Simple helper struct for the init function to avoid compiler errors encountered by using other techniques
|
||||
private struct R {
|
||||
let txn: ndb_txn
|
||||
let generation: Int
|
||||
}
|
||||
|
||||
static func new(on ndb: Ndb, with valueGetter: (PlaceholderNdbTxn) -> T? = { _ in () }, name: String = "txn") -> SafeNdbTxn<T>? {
|
||||
guard !ndb.is_closed else { return nil }
|
||||
var generation = ndb.generation
|
||||
var txn: ndb_txn
|
||||
let generation: Int
|
||||
let txn: ndb_txn
|
||||
let inherited: Bool
|
||||
if let active_txn = Thread.current.threadDictionary["ndb_txn"] as? ndb_txn,
|
||||
let txn_generation = Thread.current.threadDictionary["txn_generation"] as? Int,
|
||||
@@ -174,26 +189,26 @@ class SafeNdbTxn<T: ~Copyable> {
|
||||
let new_ref_count = ref_count + 1
|
||||
Thread.current.threadDictionary["ndb_txn_ref_count"] = new_ref_count
|
||||
} else {
|
||||
txn = ndb_txn()
|
||||
guard !ndb.is_closed else { return nil }
|
||||
generation = ndb.generation
|
||||
#if TXNDEBUG
|
||||
txn_count += 1
|
||||
#endif
|
||||
let ok = ndb_begin_query(ndb.ndb.ndb, &txn) != 0
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
generation = ndb.generation
|
||||
let result: R? = try? ndb.withNdb({
|
||||
var txn = ndb_txn()
|
||||
#if TXNDEBUG
|
||||
txn_count += 1
|
||||
#endif
|
||||
let ok = ndb_begin_query(ndb.ndb.ndb, &txn) != 0
|
||||
guard ok else { return .none }
|
||||
return .some(R(txn: txn, generation: ndb.generation))
|
||||
}, maxWaitTimeout: .milliseconds(200))
|
||||
guard let result else { return nil }
|
||||
txn = result.txn
|
||||
generation = result.generation
|
||||
Thread.current.threadDictionary["ndb_txn"] = txn
|
||||
Thread.current.threadDictionary["ndb_txn_ref_count"] = 1
|
||||
Thread.current.threadDictionary["txn_generation"] = ndb.generation
|
||||
inherited = false
|
||||
}
|
||||
#if TXNDEBUG
|
||||
print("txn: open gen\(self.generation) '\(self.name)' \(txn_count)")
|
||||
print("txn: open gen\(generation) '\(name)' \(txn_count)")
|
||||
#endif
|
||||
let moved = false
|
||||
let placeholderTxn = PlaceholderNdbTxn(txn: txn)
|
||||
guard let val = valueGetter(placeholderTxn) else { return nil }
|
||||
return SafeNdbTxn<T>(ndb: ndb, txn: txn, val: val, generation: generation, inherited: inherited, name: name)
|
||||
@@ -223,7 +238,9 @@ class SafeNdbTxn<T: ~Copyable> {
|
||||
Thread.current.threadDictionary["ndb_txn_ref_count"] = new_ref_count
|
||||
assert(new_ref_count >= 0, "NdbTxn reference count should never be below zero")
|
||||
if new_ref_count <= 0 {
|
||||
ndb_end_query(&self.txn)
|
||||
_ = try? ndb.withNdb({
|
||||
ndb_end_query(&self.txn)
|
||||
}, maxWaitTimeout: .milliseconds(200))
|
||||
Thread.current.threadDictionary.removeObject(forKey: "ndb_txn")
|
||||
Thread.current.threadDictionary.removeObject(forKey: "ndb_txn_ref_count")
|
||||
}
|
||||
|
||||
@@ -0,0 +1,197 @@
|
||||
//
|
||||
// NdbUseLock.swift
|
||||
// damus
|
||||
//
|
||||
// Created by Daniel D’Aquino on 2025-11-12.
|
||||
//
|
||||
|
||||
import Dispatch
|
||||
import Synchronization
|
||||
|
||||
extension Ndb {
|
||||
/// Creates a `sync` mechanism for coordinating usages of ndb (read or write) with the app's ability to close ndb.
|
||||
///
|
||||
/// This prevents race condition between threads reading from `ndb` and the app trying to close `ndb`
|
||||
///
|
||||
/// Implementation notes:
|
||||
/// - This was made as a synchronous mechanism because using `async` solutions (e.g. isolating `Ndb` into an `NdbActor`)
|
||||
/// creates a necessity to change way too much code around the codebase, the interface becomes more cumbersome and difficult to use,
|
||||
/// and might create unnecessary async delays (e.g. it would prevent two tasks from reading Ndb data at once)
|
||||
@available(iOS 18.0, *)
|
||||
class UseLock: UseLockProtocol {
|
||||
/// Number of functions using the `ndb` object (for reading or writing data)
|
||||
private let ndbUserCount = Mutex<UInt>(0)
|
||||
/// Semaphore for general access to `ndb`. A closing task requires exclusive access. Users of `ndb` (read/write tasks) share the access
|
||||
private let ndbAccessSemaphore: DispatchSemaphore = DispatchSemaphore(value: 0)
|
||||
private let ndbIsOpen = Mutex<Bool>(false)
|
||||
/// How long a thread can block before throwing an error
|
||||
private static let DEFAULT_TIMEOUT: DispatchTimeInterval = .milliseconds(500)
|
||||
|
||||
/// Keeps the ndb open while performing some specified operation.
|
||||
///
|
||||
/// **WARNING:** Ensure ndb is open _before_ calling this, otherwise the thread may block for the `maxTimeout` period.
|
||||
/// **Implementation note:** NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation
|
||||
///
|
||||
/// - Parameter operation: The operation to perform while `ndb` is open. Keep this as short as safely possible!
|
||||
/// - Parameter maxTimeout: The maximum amount of time the function will wait for the lock before giving up.
|
||||
/// - Returns: The return result for the given operation
|
||||
func keepNdbOpen<T>(during operation: () throws -> T, maxWaitTimeout: DispatchTimeInterval = DEFAULT_TIMEOUT) throws -> T {
|
||||
try self.incrementUserCount(maxTimeout: maxWaitTimeout)
|
||||
defer { self.decrementUserCount() } // Use defer to guarantee this will always be called no matter the outcome of the function
|
||||
return try operation()
|
||||
}
|
||||
|
||||
/// Waits for ndb to be able to close, then closes it.
|
||||
///
|
||||
/// - Parameter operation: The operation to close. Must return the final boolean value indicating if ndb was closed in the end
|
||||
///
|
||||
/// Implementation note: NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation
|
||||
func waitUntilNdbCanClose(thenClose operation: () -> Bool, maxTimeout: DispatchTimeInterval = DEFAULT_TIMEOUT) throws {
|
||||
try ndbAccessSemaphore.waitOrThrow(timeout: .now() + maxTimeout)
|
||||
ndbIsOpen.withLock { ndbIsOpen in
|
||||
ndbIsOpen = operation()
|
||||
if ndbIsOpen {
|
||||
ndbAccessSemaphore.signal()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func markNdbOpen() {
|
||||
ndbIsOpen.withLock { ndbIsOpen in
|
||||
if !ndbIsOpen {
|
||||
ndbIsOpen = true
|
||||
ndbAccessSemaphore.signal()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func incrementUserCount(maxTimeout: DispatchTimeInterval = .seconds(2)) throws {
|
||||
try ndbUserCount.withLock { currentCount in
|
||||
// Signal that ndb cannot close while we have at least one user using ndb
|
||||
if currentCount == 0 {
|
||||
try ndbAccessSemaphore.waitOrThrow(timeout: .now() + maxTimeout)
|
||||
}
|
||||
currentCount += 1
|
||||
}
|
||||
}
|
||||
|
||||
private func decrementUserCount() {
|
||||
ndbUserCount.withLock { currentCount in
|
||||
currentCount -= 1
|
||||
// Signal that ndb can close if we have zero users using ndb
|
||||
if currentCount == 0 {
|
||||
ndbAccessSemaphore.signal()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum LockError: Error {
|
||||
case timeout
|
||||
}
|
||||
}
|
||||
|
||||
/// A fallback implementation for `UseLock` that works in iOS older than iOS 18, with reduced syncing mechanisms
|
||||
class FallbackUseLock: UseLockProtocol {
|
||||
/// Number of functions using the `ndb` object (for reading or writing data)
|
||||
private var ndbUserCount: UInt = 0
|
||||
/// Semaphore for general access to `ndb`. A closing task requires exclusive access. Users of `ndb` (read/write tasks) share the access
|
||||
private let ndbAccessSemaphore: DispatchSemaphore = DispatchSemaphore(value: 0)
|
||||
/// How long a thread can block before throwing an error
|
||||
private static let DEFAULT_TIMEOUT: DispatchTimeInterval = .milliseconds(500)
|
||||
|
||||
/// Keeps the ndb open while performing some specified operation.
|
||||
///
|
||||
/// **WARNING:** Ensure ndb is open _before_ calling this, otherwise the thread may block for the `maxTimeout` period.
|
||||
/// **Implementation note:** NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation
|
||||
///
|
||||
/// - Parameter operation: The operation to perform while `ndb` is open. Keep this as short as safely possible!
|
||||
/// - Parameter maxTimeout: The maximum amount of time the function will wait for the lock before giving up.
|
||||
/// - Returns: The return result for the given operation
|
||||
func keepNdbOpen<T>(during operation: () throws -> T, maxWaitTimeout: DispatchTimeInterval = DEFAULT_TIMEOUT) throws -> T {
|
||||
try self.incrementUserCount(maxTimeout: maxWaitTimeout)
|
||||
defer { self.decrementUserCount() } // Use defer to guarantee this will always be called no matter the outcome of the function
|
||||
return try operation()
|
||||
}
|
||||
|
||||
/// Waits for ndb to be able to close, then closes it.
|
||||
///
|
||||
/// - Parameter operation: The operation to close. Must return the final boolean value indicating if ndb was closed in the end
|
||||
///
|
||||
/// Implementation note: NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation
|
||||
func waitUntilNdbCanClose(thenClose operation: () -> Bool, maxTimeout: DispatchTimeInterval = DEFAULT_TIMEOUT) throws {
|
||||
try ndbAccessSemaphore.waitOrThrow(timeout: .now() + maxTimeout)
|
||||
let ndbIsOpen = operation()
|
||||
if ndbIsOpen {
|
||||
ndbAccessSemaphore.signal()
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks `ndb` as open to allow other users to use it. Do not call this more than once
|
||||
func markNdbOpen() {
|
||||
ndbAccessSemaphore.signal()
|
||||
}
|
||||
|
||||
private func incrementUserCount(maxTimeout: DispatchTimeInterval = .seconds(2)) throws {
|
||||
if ndbUserCount == 0 {
|
||||
try ndbAccessSemaphore.waitOrThrow(timeout: .now() + maxTimeout)
|
||||
}
|
||||
ndbUserCount += 1
|
||||
}
|
||||
|
||||
private func decrementUserCount() {
|
||||
ndbUserCount -= 1
|
||||
// Signal that ndb can close if we have zero users using ndb
|
||||
if ndbUserCount == 0 {
|
||||
ndbAccessSemaphore.signal()
|
||||
}
|
||||
}
|
||||
|
||||
enum LockError: Error {
|
||||
case timeout
|
||||
}
|
||||
}
|
||||
|
||||
protocol UseLockProtocol {
|
||||
/// Keeps the ndb open while performing some specified operation.
|
||||
///
|
||||
/// **WARNING:** Ensure ndb is open _before_ calling this, otherwise the thread may block for the `maxTimeout` period.
|
||||
/// **Implementation note:** NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation
|
||||
///
|
||||
/// - Parameter operation: The operation to perform while `ndb` is open. Keep this as short as safely possible!
|
||||
/// - Parameter maxTimeout: The maximum amount of time the function will wait for the lock before giving up.
|
||||
/// - Returns: The return result for the given operation
|
||||
func keepNdbOpen<T>(during operation: () throws -> T, maxWaitTimeout: DispatchTimeInterval) throws -> T
|
||||
|
||||
/// Waits for ndb to be able to close, then closes it.
|
||||
///
|
||||
/// - Parameter operation: The operation to close. Must return the final boolean value indicating if ndb was closed in the end
|
||||
///
|
||||
/// Implementation note: NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation
|
||||
func waitUntilNdbCanClose(thenClose operation: () -> Bool, maxTimeout: DispatchTimeInterval) throws
|
||||
|
||||
/// Marks `ndb` as open to allow other users to use it. Do not call this more than once
|
||||
func markNdbOpen()
|
||||
}
|
||||
|
||||
static func initLock() -> UseLockProtocol {
|
||||
if #available(iOS 18.0, *) {
|
||||
return UseLock()
|
||||
} else {
|
||||
return FallbackUseLock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fileprivate extension DispatchSemaphore {
|
||||
func waitOrThrow(timeout: DispatchTime) throws(TimingError) {
|
||||
let result = self.wait(timeout: timeout)
|
||||
switch result {
|
||||
case .success: return
|
||||
case .timedOut: throw .timeout
|
||||
}
|
||||
}
|
||||
|
||||
enum TimingError: Error {
|
||||
case timeout
|
||||
}
|
||||
}
|
||||
@@ -63,15 +63,14 @@ final class NdbTests: XCTestCase {
|
||||
do {
|
||||
let ndb = Ndb(path: db_dir)!
|
||||
let id = NoteId(hex: "d12c17bde3094ad32f4ab862a6cc6f5c289cfe7d5802270bdf34904df585f349")!
|
||||
guard let txn = NdbTxn(ndb: ndb) else { return XCTAssert(false) }
|
||||
let note = ndb.lookup_note_and_copy(id)
|
||||
let note = try? ndb.lookup_note_and_copy(id)
|
||||
XCTAssertNotNil(note)
|
||||
guard let note else { return }
|
||||
let pk = Pubkey(hex: "32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245")!
|
||||
XCTAssertEqual(note.pubkey, pk)
|
||||
|
||||
let profile = ndb.lookup_profile_and_copy(pk)
|
||||
let lnurl = ndb.lookup_profile_lnurl(pk)
|
||||
let profile = try? ndb.lookup_profile_and_copy(pk)
|
||||
let lnurl = try? ndb.lookup_profile_lnurl(pk)
|
||||
XCTAssertNotNil(profile)
|
||||
guard let profile else { return }
|
||||
|
||||
@@ -91,14 +90,14 @@ final class NdbTests: XCTestCase {
|
||||
|
||||
do {
|
||||
let ndb = Ndb(path: db_dir)!
|
||||
let note_ids = ndb.text_search(query: "barked")
|
||||
let note_ids = (try? ndb.text_search(query: "barked")) ?? []
|
||||
XCTAssertEqual(note_ids.count, 1)
|
||||
let expected_note_id = NoteId(hex: "b17a540710fe8495b16bfbaf31c6962c4ba8387f3284a7973ad523988095417e")!
|
||||
guard note_ids.count > 0 else {
|
||||
XCTFail("Expected at least one note to be found")
|
||||
return
|
||||
}
|
||||
let note_id = ndb.lookup_note_by_key(note_ids[0], borrow: { maybeUnownedNote -> NoteId? in
|
||||
let note_id = try? ndb.lookup_note_by_key(note_ids[0], borrow: { maybeUnownedNote -> NoteId? in
|
||||
switch maybeUnownedNote {
|
||||
case .none: return nil
|
||||
case .some(let unownedNote): return unownedNote.id
|
||||
|
||||
Reference in New Issue
Block a user