Create NostrDB streaming and async lookup interfaces
This commit introduces new interfaces for working with NostrDB from Swift, including `NostrFilter` conversion, subscription streaming via AsyncStreams and lookup/wait functions. No user-facing changes. Changelog-None Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
This commit is contained in:
@@ -33,11 +33,12 @@ class Ndb {
|
||||
let owns_db: Bool
|
||||
var generation: Int
|
||||
private var closed: Bool
|
||||
private var callbackHandler: Ndb.CallbackHandler
|
||||
|
||||
var is_closed: Bool {
|
||||
self.closed || self.ndb.ndb == nil
|
||||
}
|
||||
|
||||
|
||||
static func safemode() -> Ndb? {
|
||||
guard let path = db_path ?? old_db_path else { return nil }
|
||||
|
||||
@@ -80,7 +81,7 @@ class Ndb {
|
||||
return Ndb(ndb: ndb_t(ndb: nil))
|
||||
}
|
||||
|
||||
static func open(path: String? = nil, owns_db_file: Bool = true) -> ndb_t? {
|
||||
static func open(path: String? = nil, owns_db_file: Bool = true, callbackHandler: Ndb.CallbackHandler) -> ndb_t? {
|
||||
var ndb_p: OpaquePointer? = nil
|
||||
|
||||
let ingest_threads: Int32 = 4
|
||||
@@ -111,6 +112,19 @@ class Ndb {
|
||||
var ok = false
|
||||
while !ok && mapsize > 1024 * 1024 * 700 {
|
||||
var cfg = ndb_config(flags: 0, ingester_threads: ingest_threads, mapsize: mapsize, filter_context: nil, ingest_filter: nil, sub_cb_ctx: nil, sub_cb: nil)
|
||||
|
||||
// Here we hook up the global callback function for subscription callbacks.
|
||||
// We do an "unretained" pass here because the lifetime of the callback handler is larger than the lifetime of the nostrdb monitor in the C code.
|
||||
// The NostrDB monitor that makes the callbacks should in theory _never_ outlive the callback handler.
|
||||
//
|
||||
// This means that:
|
||||
// - for as long as nostrdb is running, its parent Ndb instance will be alive, keeping the callback handler alive.
|
||||
// - when the Ndb instance is deinitialized — and the callback handler comes down with it — the `deinit` function will destroy the nostrdb monitor, preventing it from accessing freed memory.
|
||||
//
|
||||
// Therefore, we do not need to increase reference count to callbackHandler. The tightly coupled lifetimes will ensure that it is always alive when the ndb_monitor is alive.
|
||||
let ctx: UnsafeMutableRawPointer = Unmanaged.passUnretained(callbackHandler).toOpaque()
|
||||
ndb_config_set_subscription_callback(&cfg, subscription_callback, ctx)
|
||||
|
||||
let res = ndb_init(&ndb_p, testdir, &cfg);
|
||||
ok = res != 0;
|
||||
if !ok {
|
||||
@@ -124,12 +138,15 @@ class Ndb {
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return ndb_t(ndb: ndb_p)
|
||||
|
||||
let ndb_instance = ndb_t(ndb: ndb_p)
|
||||
Task { await callbackHandler.set(ndb: ndb_instance) }
|
||||
return ndb_instance
|
||||
}
|
||||
|
||||
init?(path: String? = nil, owns_db_file: Bool = true) {
|
||||
guard let db = Self.open(path: path, owns_db_file: owns_db_file) else {
|
||||
let callbackHandler = Ndb.CallbackHandler()
|
||||
guard let db = Self.open(path: path, owns_db_file: owns_db_file, callbackHandler: callbackHandler) else {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -138,6 +155,7 @@ class Ndb {
|
||||
self.owns_db = owns_db_file
|
||||
self.ndb = db
|
||||
self.closed = false
|
||||
self.callbackHandler = callbackHandler
|
||||
}
|
||||
|
||||
private static func migrate_db_location_if_needed() throws {
|
||||
@@ -183,6 +201,8 @@ class Ndb {
|
||||
self.path = nil
|
||||
self.owns_db = true
|
||||
self.closed = false
|
||||
// This simple initialization will cause subscriptions not to be ever called. Probably fine because this initializer is used only for empty example ndb instances.
|
||||
self.callbackHandler = Ndb.CallbackHandler()
|
||||
}
|
||||
|
||||
func close() {
|
||||
@@ -196,7 +216,7 @@ class Ndb {
|
||||
|
||||
func reopen() -> Bool {
|
||||
guard self.is_closed,
|
||||
let db = Self.open(path: self.path, owns_db_file: self.owns_db) else {
|
||||
let db = Self.open(path: self.path, owns_db_file: self.owns_db, callbackHandler: self.callbackHandler) else {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -581,10 +601,220 @@ class Ndb {
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: NdbFilter queries and subscriptions
|
||||
|
||||
/// Safe wrapper around the `ndb_query` C function
|
||||
/// - Parameters:
|
||||
/// - txn: Database transaction
|
||||
/// - filters: Array of NdbFilter objects
|
||||
/// - maxResults: Maximum number of results to return
|
||||
/// - Returns: Array of note keys matching the filters
|
||||
/// - Throws: NdbStreamError if the query fails
|
||||
func query<Y>(with txn: NdbTxn<Y>, filters: [NdbFilter], maxResults: Int) throws(NdbStreamError) -> [NoteKey] {
|
||||
let filtersPointer = UnsafeMutablePointer<ndb_filter>.allocate(capacity: filters.count)
|
||||
defer { filtersPointer.deallocate() }
|
||||
|
||||
for (index, ndbFilter) in filters.enumerated() {
|
||||
filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter
|
||||
}
|
||||
|
||||
let count = UnsafeMutablePointer<Int32>.allocate(capacity: 1)
|
||||
defer { count.deallocate() }
|
||||
|
||||
let results = UnsafeMutablePointer<ndb_query_result>.allocate(capacity: maxResults)
|
||||
defer { results.deallocate() }
|
||||
|
||||
guard ndb_query(&txn.txn, filtersPointer, Int32(filters.count), results, Int32(maxResults), count) == 1 else {
|
||||
throw NdbStreamError.initialQueryFailed
|
||||
}
|
||||
|
||||
var noteIds: [NoteKey] = []
|
||||
for i in 0..<count.pointee {
|
||||
noteIds.append(results.advanced(by: Int(i)).pointee.note_id)
|
||||
}
|
||||
|
||||
return noteIds
|
||||
}
|
||||
|
||||
/// Safe wrapper around `ndb_subscribe` that handles all pointer management
|
||||
/// - Parameters:
|
||||
/// - filters: Array of NdbFilter objects
|
||||
/// - Returns: AsyncStream of StreamItem events for new matches only
|
||||
private func ndbSubscribe(filters: [NdbFilter]) -> AsyncStream<StreamItem> {
|
||||
return AsyncStream<StreamItem> { continuation in
|
||||
// Allocate filters pointer - will be deallocated when subscription ends
|
||||
// Cannot use `defer` to deallocate `filtersPointer` because it needs to remain valid for the lifetime of the subscription, which extends beyond this block's scope.
|
||||
let filtersPointer = UnsafeMutablePointer<ndb_filter>.allocate(capacity: filters.count)
|
||||
for (index, ndbFilter) in filters.enumerated() {
|
||||
filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter
|
||||
}
|
||||
|
||||
var streaming = true
|
||||
var subid: UInt64 = 0
|
||||
var terminationStarted = false
|
||||
|
||||
// Set up termination handler
|
||||
continuation.onTermination = { @Sendable _ in
|
||||
guard !terminationStarted else { return } // Avoid race conditions between two termination closures
|
||||
terminationStarted = true
|
||||
Log.debug("ndb_wait: stream: Terminated early", for: .ndb)
|
||||
streaming = false
|
||||
// Clean up resources on early termination
|
||||
if subid != 0 {
|
||||
ndb_unsubscribe(self.ndb.ndb, subid)
|
||||
Task { await self.unsetCallback(subscriptionId: subid) }
|
||||
}
|
||||
filtersPointer.deallocate()
|
||||
}
|
||||
|
||||
if !streaming {
|
||||
return
|
||||
}
|
||||
|
||||
// Set up subscription
|
||||
subid = ndb_subscribe(self.ndb.ndb, filtersPointer, Int32(filters.count))
|
||||
|
||||
// Set the subscription callback
|
||||
Task {
|
||||
await self.setCallback(for: subid, callback: { noteKey in
|
||||
continuation.yield(.event(noteKey))
|
||||
})
|
||||
}
|
||||
|
||||
// Update termination handler to include subscription cleanup
|
||||
continuation.onTermination = { @Sendable _ in
|
||||
guard !terminationStarted else { return } // Avoid race conditions between two termination closures
|
||||
terminationStarted = true
|
||||
Log.debug("ndb_wait: stream: Terminated early", for: .ndb)
|
||||
streaming = false
|
||||
ndb_unsubscribe(self.ndb.ndb, subid)
|
||||
Task { await self.unsetCallback(subscriptionId: subid) }
|
||||
filtersPointer.deallocate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func subscribe(filters: [NdbFilter], maxSimultaneousResults: Int = 1000) throws(NdbStreamError) -> AsyncStream<StreamItem> {
|
||||
// Fetch initial results
|
||||
guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction }
|
||||
|
||||
// Use our safe wrapper instead of direct C function call
|
||||
let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults)
|
||||
|
||||
// Create a subscription for new events
|
||||
let newEventsStream = ndbSubscribe(filters: filters)
|
||||
|
||||
// Create a cascading stream that combines initial results with new events
|
||||
return AsyncStream<StreamItem> { continuation in
|
||||
// Stream all results already present in the database
|
||||
for noteId in noteIds {
|
||||
continuation.yield(.event(noteId))
|
||||
}
|
||||
|
||||
// Indicate this is the end of the results currently present in the database
|
||||
continuation.yield(.eose)
|
||||
|
||||
// Create a task to forward events from the subscription stream
|
||||
let forwardingTask = Task {
|
||||
for await item in newEventsStream {
|
||||
continuation.yield(item)
|
||||
}
|
||||
continuation.finish()
|
||||
}
|
||||
|
||||
// Handle termination by canceling the forwarding task
|
||||
continuation.onTermination = { @Sendable _ in
|
||||
forwardingTask.cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func waitWithoutTimeout(for noteId: NoteId) async throws(NdbLookupError) -> NdbTxn<NdbNote>? {
|
||||
do {
|
||||
for try await item in try self.subscribe(filters: [NostrFilter(ids: [noteId])]) {
|
||||
switch item {
|
||||
case .eose:
|
||||
continue
|
||||
case .event(let noteKey):
|
||||
guard let txn = NdbTxn(ndb: self) else { throw NdbLookupError.cannotOpenTransaction }
|
||||
guard let note = self.lookup_note_by_key_with_txn(noteKey, txn: txn) else { throw NdbLookupError.internalInconsistency }
|
||||
if note.id == noteId {
|
||||
Log.debug("ndb wait: %d has matching id %s. Returning transaction", for: .ndb, noteKey, noteId.hex())
|
||||
return NdbTxn<NdbNote>.pure(ndb: self, val: note)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch {
|
||||
if let error = error as? NdbStreamError { throw NdbLookupError.streamError(error) }
|
||||
else if let error = error as? NdbLookupError { throw error }
|
||||
else { throw .internalInconsistency }
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func waitFor(noteId: NoteId, timeout: TimeInterval = 10) async throws(NdbLookupError) -> NdbTxn<NdbNote>? {
|
||||
do {
|
||||
return try await withCheckedThrowingContinuation({ continuation in
|
||||
var done = false
|
||||
let waitTask = Task {
|
||||
do {
|
||||
Log.debug("ndb_wait: Waiting for %s", for: .ndb, noteId.hex())
|
||||
let result = try await self.waitWithoutTimeout(for: noteId)
|
||||
if !done {
|
||||
Log.debug("ndb_wait: Found %s", for: .ndb, noteId.hex())
|
||||
continuation.resume(returning: result)
|
||||
done = true
|
||||
}
|
||||
}
|
||||
catch {
|
||||
if Task.isCancelled {
|
||||
return // the timeout task will handle throwing the timeout error
|
||||
}
|
||||
if !done {
|
||||
Log.debug("ndb_wait: Error on %s: %s", for: .ndb, noteId.hex(), error.localizedDescription)
|
||||
continuation.resume(throwing: error)
|
||||
done = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let timeoutTask = Task {
|
||||
try await Task.sleep(for: .seconds(Int(timeout)))
|
||||
if !done {
|
||||
Log.debug("ndb_wait: Timeout on %s. Cancelling wait task…", for: .ndb, noteId.hex())
|
||||
done = true
|
||||
print("ndb_wait: throwing timeout error")
|
||||
continuation.resume(throwing: NdbLookupError.timeout)
|
||||
}
|
||||
waitTask.cancel()
|
||||
}
|
||||
})
|
||||
}
|
||||
catch {
|
||||
if let error = error as? NdbLookupError { throw error }
|
||||
else { throw .internalInconsistency }
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Internal ndb callback interfaces
|
||||
|
||||
internal func setCallback(for subscriptionId: UInt64, callback: @escaping (NoteKey) -> Void) async {
|
||||
await self.callbackHandler.set(callback: callback, for: subscriptionId)
|
||||
}
|
||||
|
||||
internal func unsetCallback(subscriptionId: UInt64) async {
|
||||
await self.callbackHandler.unset(subid: subscriptionId)
|
||||
}
|
||||
|
||||
// MARK: Helpers
|
||||
|
||||
enum Errors: Error {
|
||||
case cannot_find_db_path
|
||||
case db_file_migration_error
|
||||
}
|
||||
|
||||
// MARK: Deinitialization
|
||||
|
||||
deinit {
|
||||
print("txn: Ndb de-init")
|
||||
@@ -592,6 +822,87 @@ class Ndb {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// MARK: - Extensions and helper structures and functions
|
||||
|
||||
extension Ndb {
|
||||
/// A class that is used to handle callbacks from nostrdb
|
||||
///
|
||||
/// This is a separate class from `Ndb` because it simplifies the initialization logic
|
||||
actor CallbackHandler {
|
||||
/// Holds the ndb instance in the C codebase. Should be shared with `Ndb`
|
||||
var ndb: ndb_t? = nil
|
||||
/// A map from nostrdb subscription ids to callbacks
|
||||
var subscriptionCallbackMap: [UInt64: (NoteKey) -> Void] = [:]
|
||||
|
||||
func set(callback: @escaping (NoteKey) -> Void, for subid: UInt64) {
|
||||
subscriptionCallbackMap[subid] = callback
|
||||
}
|
||||
|
||||
func unset(subid: UInt64) {
|
||||
subscriptionCallbackMap[subid] = nil
|
||||
}
|
||||
|
||||
func set(ndb: ndb_t?) {
|
||||
self.ndb = ndb
|
||||
}
|
||||
|
||||
/// Handles callbacks from nostrdb subscriptions, and routes them to the correct callback
|
||||
func handleSubscriptionCallback(subId: UInt64, maxCapacity: Int32 = 1000) {
|
||||
if let callback = subscriptionCallbackMap[subId] {
|
||||
let result = UnsafeMutablePointer<UInt64>.allocate(capacity: Int(maxCapacity))
|
||||
defer { result.deallocate() } // Ensure we deallocate memory before leaving the function to avoid memory leaks
|
||||
if let ndb {
|
||||
let numberOfNotes = ndb_poll_for_notes(ndb.ndb, subId, result, maxCapacity)
|
||||
for i in 0..<numberOfNotes {
|
||||
callback(result.advanced(by: Int(i)).pointee)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An item that comes out of a subscription stream
|
||||
enum StreamItem {
|
||||
/// End of currently stored events
|
||||
case eose
|
||||
/// An event in NostrDB available at the given note key
|
||||
case event(NoteKey)
|
||||
}
|
||||
|
||||
/// An error that may happen during nostrdb streaming
|
||||
enum NdbStreamError: Error {
|
||||
case cannotOpenTransaction
|
||||
case cannotConvertFilter(any Error)
|
||||
case initialQueryFailed
|
||||
case timeout
|
||||
}
|
||||
|
||||
/// An error that may happen when looking something up
|
||||
enum NdbLookupError: Error {
|
||||
case cannotOpenTransaction
|
||||
case streamError(NdbStreamError)
|
||||
case internalInconsistency
|
||||
case timeout
|
||||
}
|
||||
}
|
||||
|
||||
/// This callback "trampoline" function will be called when new notes arrive for NostrDB subscriptions.
|
||||
///
|
||||
/// This is needed as a separate global function in order to allow us to pass it to the C code as a callback (We can't pass native Swift fuctions directly as callbacks).
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - ctx: A pointer to a context object setup during initialization. This allows this function to "find" the correct place to call. MUST be a pointer to a `CallbackHandler`, otherwise this will trigger a crash
|
||||
/// - subid: The NostrDB subscription ID, which identifies the subscription that is being called back
|
||||
@_cdecl("subscription_callback")
|
||||
public func subscription_callback(ctx: UnsafeMutableRawPointer?, subid: UInt64) {
|
||||
guard let ctx else { return }
|
||||
let handler = Unmanaged<Ndb.CallbackHandler>.fromOpaque(ctx).takeUnretainedValue()
|
||||
Task {
|
||||
await handler.handleSubscriptionCallback(subId: subid)
|
||||
}
|
||||
}
|
||||
|
||||
#if DEBUG
|
||||
func getDebugCheckedRoot<T: FlatBufferObject>(byteBuffer: inout ByteBuffer) throws -> T {
|
||||
return getRoot(byteBuffer: &byteBuffer)
|
||||
|
||||
Reference in New Issue
Block a user