diff --git a/damus/Core/Storage/DatabaseSnapshotManager.swift b/damus/Core/Storage/DatabaseSnapshotManager.swift index feca7251..33c7d86a 100644 --- a/damus/Core/Storage/DatabaseSnapshotManager.swift +++ b/damus/Core/Storage/DatabaseSnapshotManager.swift @@ -108,6 +108,9 @@ actor DatabaseSnapshotManager { } /// Perform the actual snapshot operation. + /// + /// Creates a storage-efficient snapshot by creating a new temporary Ndb instance + /// and selectively copying only the necessary notes (profiles, mute lists, contact lists). func performSnapshot() async throws { guard let snapshotPath = Ndb.snapshot_db_path else { throw SnapshotError.pathsUnavailable @@ -115,7 +118,7 @@ actor DatabaseSnapshotManager { Log.info("Starting nostrdb snapshot to %{public}@", for: .storage, snapshotPath) - try await copyDatabase(to: snapshotPath) + try await createSelectiveSnapshot(to: snapshotPath) // Update the last snapshot date UserDefaults.standard.set(Date(), forKey: Self.lastSnapshotDateKey) @@ -124,40 +127,143 @@ actor DatabaseSnapshotManager { self.snapshotCount += 1 } - /// Copy the database using LMDB's native copy function. - private func copyDatabase(to snapshotPath: String) async throws { - return try await withCheckedThrowingContinuation { continuation in - let fileManager = FileManager.default + /// Creates a selective snapshot containing only profiles, mute lists, and contact lists. + /// + /// This method: + /// 1. Creates a temporary Ndb instance in a temp directory + /// 2. Queries the source database for relevant notes + /// 3. Writes each note to the temporary database + /// 4. Atomically moves the temporary database to the final destination + private func createSelectiveSnapshot(to snapshotPath: String) async throws { + let fileManager = FileManager.default + + // Create a temporary directory for the snapshot + let tempDir = FileManager.default.temporaryDirectory + let tempSnapshotPath = tempDir.appendingPathComponent("snapshot_temp_\(UUID().uuidString)") + + do { + try fileManager.createDirectory(atPath: tempSnapshotPath.path, withIntermediateDirectories: true) + } catch { + throw SnapshotError.directoryCreationFailed(error) + } + + // Ensure cleanup on error + defer { + try? fileManager.removeItem(atPath: tempSnapshotPath.path) + } + + Log.debug("Created temporary snapshot directory at %{public}@", for: .storage, tempSnapshotPath.path) + + // Create a new Ndb instance in the temporary directory + guard let snapshotNdb = Ndb(path: tempSnapshotPath.path, owns_db_file: true) else { + throw SnapshotError.failedToCreateSnapshotDatabase + } + + defer { + snapshotNdb.close() + } + + Log.debug("Created temporary Ndb instance for snapshot", for: .storage) + + // Query and copy notes to snapshot database + try await copyNotesToSnapshot(snapshotNdb: snapshotNdb) + + Log.debug("Copied notes to snapshot database", for: .storage) + + // Close the snapshot database before moving files + snapshotNdb.close() + + // Atomically move the temporary database to the final destination + try await moveSnapshotToFinalDestination(from: tempSnapshotPath.path, to: snapshotPath) + + Log.debug("Moved snapshot to final destination", for: .storage) + } + + /// Queries the source database and copies relevant notes to the snapshot database. + private func copyNotesToSnapshot(snapshotNdb: Ndb) async throws { + let filters = try createSnapshotFilters() + + Log.debug("Querying source database with %d filters", for: .storage, filters.count) + + var totalNotesCopied = 0 + + for filter in filters { + let noteKeys = try ndb.query(filters: [filter], maxResults: 100_000) - // Delete existing database files at the destination if they exist - // LMDB creates multiple files (data.mdb, lock.mdb), so we remove the entire directory - if fileManager.fileExists(atPath: snapshotPath) { - do { - try fileManager.removeItem(atPath: snapshotPath) - Log.debug("Removed existing snapshot at %{public}@", for: .storage, snapshotPath) - } catch { - continuation.resume(throwing: SnapshotError.removeFailed(error)) - return - } + Log.debug("Found %d notes for filter", for: .storage, noteKeys.count) + + for noteKey in noteKeys { + // Get the note from source database and copy to snapshot + try ndb.lookup_note_by_key(noteKey, borrow: { unownedNote in + // Convert the note to owned, encode to JSON, and process into snapshot database + guard let ownedNote = unownedNote?.toOwned() else { + Log.error("Failed to get unowned note", for: .storage) + return + } + + // Process the note into the snapshot database + + // Implementation note: This does not _immediately_ add the event to the new Ndb. + // It goes into the ingester queue first for later processing. + // This raises the question: How to guarantee that all notes will be saved to the new + // snapshot Ndb before we close it? + // + // The answer is that when `Ndb.close` is called, it actually waits for the ingester task + // to finish processing its queue — unless the queue is full (an edge case). + try snapshotNdb.add(event: ownedNote) + totalNotesCopied += 1 + }) } - - Log.debug("Recreate the snapshot directory", for: .storage, snapshotPath) - // Recreate the snapshot directory + } + + Log.info("Copied %d notes to snapshot database", for: .storage, totalNotesCopied) + } + + /// Creates filters for querying profiles, mute lists, and contact lists. + private func createSnapshotFilters() throws -> [NdbFilter] { + // Filter for profile metadata (kind 0) + let profileFilter = try NdbFilter(from: NostrFilter(kinds: [.metadata])) + + // Filter for contact lists (kind 3) + let contactsFilter = try NdbFilter(from: NostrFilter(kinds: [.contacts])) + + // Filter for mute lists (kind 10000) + let muteListFilter = try NdbFilter(from: NostrFilter(kinds: [.mute_list])) + + return [profileFilter, contactsFilter, muteListFilter] + } + + /// Atomically moves the snapshot from temporary location to final destination. + private func moveSnapshotToFinalDestination(from tempPath: String, to finalPath: String) async throws { + let fileManager = FileManager.default + + // Remove existing snapshot if it exists + if fileManager.fileExists(atPath: finalPath) { do { - try fileManager.createDirectory(atPath: snapshotPath, withIntermediateDirectories: true) + try fileManager.removeItem(atPath: finalPath) + Log.debug("Removed existing snapshot at %{public}@", for: .storage, finalPath) } catch { - continuation.resume(throwing: SnapshotError.directoryCreationFailed(error)) - return + throw SnapshotError.removeFailed(error) } - + } + + // Create parent directory if needed + let parentDir = URL(fileURLWithPath: finalPath).deletingLastPathComponent().path + if !fileManager.fileExists(atPath: parentDir) { do { - try ndb.snapshot(path: snapshotPath) - continuation.resume(returning: ()) - } - catch { - continuation.resume(throwing: SnapshotError.copyFailed(error)) + try fileManager.createDirectory(atPath: parentDir, withIntermediateDirectories: true) + } catch { + throw SnapshotError.directoryCreationFailed(error) } } + + // Atomically move the temp snapshot to final destination + do { + try fileManager.moveItem(atPath: tempPath, toPath: finalPath) + Log.debug("Moved snapshot from %{public}@ to %{public}@", for: .storage, tempPath, finalPath) + } catch { + throw SnapshotError.moveFailed(error) + } } // MARK: - Stats functions @@ -179,6 +285,8 @@ enum SnapshotError: Error, LocalizedError { case copyFailed(any Error) case removeFailed(Error) case directoryCreationFailed(Error) + case failedToCreateSnapshotDatabase + case moveFailed(Error) var errorDescription: String? { switch self { @@ -190,6 +298,10 @@ enum SnapshotError: Error, LocalizedError { return "Failed to remove existing snapshot: \(error.localizedDescription)" case .directoryCreationFailed(let error): return "Failed to create snapshot directory: \(error.localizedDescription)" + case .failedToCreateSnapshotDatabase: + return "Failed to create temporary snapshot database" + case .moveFailed(let error): + return "Failed to move snapshot to final destination: \(error.localizedDescription)" } } } diff --git a/damusTests/DatabaseSnapshotManagerTests.swift b/damusTests/DatabaseSnapshotManagerTests.swift index 19a806ad..5943923f 100644 --- a/damusTests/DatabaseSnapshotManagerTests.swift +++ b/damusTests/DatabaseSnapshotManagerTests.swift @@ -8,12 +8,78 @@ import XCTest @testable import damus -final class DatabaseSnapshotManagerTests: XCTestCase { +class DatabaseSnapshotManagerTests: XCTestCase { var tempDirectory: URL! var manager: DatabaseSnapshotManager! var testNdb: Ndb! + /// Helper function to collect note IDs from a database subscription until expected notes are found or timeout occurs. + /// - Parameters: + /// - ndb: The database instance to subscribe to + /// - filters: The filters to use for subscription + /// - expectedNoteIds: The set of note IDs we expect to find + /// - expectation: The XCTestExpectation to fulfill when all notes are found + /// - timeout: Maximum time to wait in seconds (default: 5.0) + /// - Returns: The set of collected note IDs + private func collectNoteIds( + from ndb: Ndb, + filters: [NdbFilter], + expectedNoteIds: Set, + expectation: XCTestExpectation, + timeout: TimeInterval = 5.0 + ) -> Task, Never> { + Task { + await withCheckedContinuation { continuation in + var collectedNoteIds = Set() + var hasReturned = false + + // Timeout handler + Task { + try? await Task.sleep(for: .seconds(timeout)) + guard !hasReturned else { return } + hasReturned = true + print("⚠️ Timeout: Expected \(expectedNoteIds.count) notes, collected \(collectedNoteIds.count)") + continuation.resume(returning: collectedNoteIds) + } + + // Subscription handler + Task { + do { + for await item in try ndb.subscribe(filters: filters) { + guard !hasReturned else { break } + + switch item { + case .eose: + continue + case .event(let noteKey): + try ndb.lookup_note_by_key(noteKey, borrow: { unownedNote in + switch unownedNote { + case .none: + return + case .some(let unownedNote): + collectedNoteIds.insert(unownedNote.id) + } + }) + } + + if collectedNoteIds == expectedNoteIds { + hasReturned = true + expectation.fulfill() + continuation.resume(returning: collectedNoteIds) + } + } + } catch { + guard !hasReturned else { return } + hasReturned = true + XCTFail("Note streaming failed: \(error)") + continuation.resume(returning: collectedNoteIds) + } + } + } + } + } + override func setUp() async throws { try await super.setUp() @@ -276,6 +342,146 @@ final class DatabaseSnapshotManagerTests: XCTestCase { XCTAssertFalse(result) } + // MARK: - Selective Snapshot Content Tests + + func testPerformSnapshot_ContainsOnlyRelevantNoteTypes() async throws { + // Given: A database with various note types + let profileNote = NostrEvent(content: "{\"name\":\"Test User\"}", keypair: test_keypair, kind: NostrKind.metadata.rawValue)! + let textNote = NostrEvent(content: "Hello world", keypair: test_keypair, kind: NostrKind.text.rawValue)! + let contactsNote = NostrEvent(content: "", keypair: test_keypair, kind: NostrKind.contacts.rawValue)! + let muteListNote = NostrEvent(content: "", keypair: test_keypair, kind: NostrKind.mute_list.rawValue)! + + let profileFilter = try NdbFilter(from: NostrFilter(kinds: [.metadata])) + let contactsFilter = try NdbFilter(from: NostrFilter(kinds: [.contacts])) + let muteListFilter = try NdbFilter(from: NostrFilter(kinds: [.mute_list])) + let textFilter = try NdbFilter(from: NostrFilter(kinds: [.text])) + + // Process notes into source database + let expectedIngestedNotes = [profileNote, textNote, contactsNote, muteListNote] + let expectedSnapshottedNotes = [profileNote, contactsNote, muteListNote] + + let expectedIngestedNoteIds = Set(expectedIngestedNotes.map { $0.id }) + let expectedSnapshottedNoteIds = Set(expectedSnapshottedNotes.map { $0.id }) + + let allNotesAreIngestedInSourceDB = XCTestExpectation(description: "All notes are ingested in source DB") + let ingestTask = collectNoteIds( + from: testNdb, + filters: [profileFilter, contactsFilter, muteListFilter, textFilter], + expectedNoteIds: expectedIngestedNoteIds, + expectation: allNotesAreIngestedInSourceDB + ) + + for note in expectedIngestedNotes { + try testNdb.add(event: note) + } + + await fulfillment(of: [allNotesAreIngestedInSourceDB], timeout: 5) + let ingestedNoteIds = await ingestTask.value + XCTAssertEqual(expectedIngestedNoteIds, ingestedNoteIds) + + guard let snapshotPath = Ndb.snapshot_db_path else { + XCTFail("Snapshot path should be available") + return + } + + // When: Creating a snapshot + try await manager.performSnapshot() + + // Then: Snapshot database should exist + XCTAssertTrue(FileManager.default.fileExists(atPath: snapshotPath)) + + // And: Snapshot should contain only profiles (kind 0), contacts (kind 3), and mute lists (kind 10000) + guard let snapshotNdb = Ndb(path: snapshotPath, owns_db_file: false) else { + XCTFail("Should be able to open snapshot database") + return + } + defer { snapshotNdb.close() } + + let allNotesAreSnapshottedToSnapshotDB = XCTestExpectation(description: "All notes are snapshotted to snapshot DB") + let snapshotTask = collectNoteIds( + from: snapshotNdb, + filters: [profileFilter, contactsFilter, muteListFilter, textFilter], + expectedNoteIds: expectedSnapshottedNoteIds, + expectation: allNotesAreSnapshottedToSnapshotDB + ) + + await fulfillment(of: [allNotesAreSnapshottedToSnapshotDB], timeout: 5) + let snapshottedNoteIds = await snapshotTask.value + XCTAssertEqual(expectedSnapshottedNoteIds, snapshottedNoteIds) + } + + func testPerformSnapshot_HandlesEmptyDatabase() async throws { + // Given: An empty database with no notes + guard let snapshotPath = Ndb.snapshot_db_path else { + XCTFail("Snapshot path should be available") + return + } + + // When: Creating a snapshot of an empty database + try await manager.performSnapshot() + + // Then: Snapshot should be created successfully + XCTAssertTrue(FileManager.default.fileExists(atPath: snapshotPath)) + + // And: Snapshot should be accessible but contain no notes + guard let snapshotNdb = Ndb(path: snapshotPath, owns_db_file: false) else { + XCTFail("Should be able to open snapshot database") + return + } + defer { snapshotNdb.close() } + + let allFilter = try NdbFilter(from: NostrFilter()) + let allKeys = try snapshotNdb.query(filters: [allFilter], maxResults: 100) + XCTAssertEqual(allKeys.count, 0, "Empty database snapshot should contain no notes") + } + + func testPerformSnapshot_HandlesLargeNumberOfNotes() async throws { + // Given: A database with many profile notes + var profileNotes: [NostrEvent] = [] + for i in 0..<2000 { + let profileNote = NostrEvent(content: "{\"name\":\"User \(i)\"}", keypair: generate_new_keypair().to_keypair(), kind: 0)! + profileNotes.append(profileNote) + } + + let profileFilter = try NdbFilter(from: NostrFilter(kinds: [.metadata])) + let expectedNoteIds = Set(profileNotes.map { $0.id }) + let allNotesIngested = XCTestExpectation(description: "All 2000 profile notes are ingested") + + let ingestTask = collectNoteIds( + from: testNdb, + filters: [profileFilter], + expectedNoteIds: expectedNoteIds, + expectation: allNotesIngested + ) + + for profileNote in profileNotes { + try testNdb.add(event: profileNote) + } + + // Wait for all notes to be ingested before snapshot + await fulfillment(of: [allNotesIngested], timeout: 10) + let ingestedNoteIds = await ingestTask.value + XCTAssertEqual(expectedNoteIds, ingestedNoteIds, "All 2000 profile notes should be ingested") + + guard let snapshotPath = Ndb.snapshot_db_path else { + XCTFail("Snapshot path should be available") + return + } + + // When: Creating a snapshot + try await manager.performSnapshot() + + // Then: Snapshot should contain all profile notes + guard let snapshotNdb = Ndb(path: snapshotPath, owns_db_file: false) else { + XCTFail("Should be able to open snapshot database") + return + } + defer { snapshotNdb.close() } + + let profileKeys = try snapshotNdb.query(filters: [profileFilter], maxResults: 100_000) + XCTAssertEqual(profileKeys.count, 2000, "Snapshot should contain all 2000 profile notes") + } + // MARK: - Edge Case Tests func testSnapshotInterval_BoundaryCondition() async throws { @@ -301,6 +507,78 @@ final class DatabaseSnapshotManagerTests: XCTestCase { // Then: No snapshot should be created XCTAssertFalse(shouldCreate, "Snapshot should not be created before minimum interval") } + + func testPerformSnapshot_ReplacesExistingSnapshot() async throws { + // Given: A snapshot already exists with a profile note + let firstProfileNote = NostrEvent(content: "{\"name\":\"First User\"}", keypair: generate_new_keypair().to_keypair(), kind: 0)! + + let profileFilter = try NdbFilter(from: NostrFilter(kinds: [.metadata])) + let firstNoteIds = Set([firstProfileNote.id]) + let firstNoteIngested = XCTestExpectation(description: "First note is ingested") + + let firstIngestTask = collectNoteIds( + from: testNdb, + filters: [profileFilter], + expectedNoteIds: firstNoteIds, + expectation: firstNoteIngested + ) + + try testNdb.add(event: firstProfileNote) + + await fulfillment(of: [firstNoteIngested], timeout: 5) + let firstIngestedNoteIds = await firstIngestTask.value + XCTAssertEqual(firstNoteIds, firstIngestedNoteIds, "First profile note should be ingested") + + try await manager.performSnapshot() + + guard let snapshotPath = Ndb.snapshot_db_path else { + XCTFail("Snapshot path should be available") + return + } + + // Add a new profile note to the source database + let secondProfileNote = NostrEvent(content: "{\"name\":\"Second User\"}", keypair: generate_new_keypair().to_keypair(), kind: 0)! + + let bothNoteIds = Set([firstProfileNote.id, secondProfileNote.id]) + let secondNoteIngested = XCTestExpectation(description: "Second note is ingested") + + let secondIngestTask = collectNoteIds( + from: testNdb, + filters: [profileFilter], + expectedNoteIds: bothNoteIds, + expectation: secondNoteIngested + ) + + try testNdb.add(event: secondProfileNote) + + await fulfillment(of: [secondNoteIngested], timeout: 5) + let secondIngestedNoteIds = await secondIngestTask.value + XCTAssertEqual(bothNoteIds, secondIngestedNoteIds, "Both profile notes should be ingested") + + // When: Creating another snapshot + try await manager.performSnapshot() + + // Then: New snapshot should replace the old one and contain both notes + guard let snapshotNdb = Ndb(path: snapshotPath, owns_db_file: false) else { + XCTFail("Should be able to open snapshot database") + return + } + defer { snapshotNdb.close() } + + let expectedNoteIds = Set([firstProfileNote.id, secondProfileNote.id]) + let allNotesAreInSnapshot = XCTestExpectation(description: "All notes are in snapshot") + + let snapshotTask = collectNoteIds( + from: snapshotNdb, + filters: [profileFilter], + expectedNoteIds: expectedNoteIds, + expectation: allNotesAreInSnapshot + ) + + await fulfillment(of: [allNotesAreInSnapshot], timeout: 5) + let snapshottedNoteIds = await snapshotTask.value + XCTAssertEqual(expectedNoteIds, snapshottedNoteIds, "Snapshot should contain both profile notes") + } } @@ -317,9 +595,12 @@ extension SnapshotError: Equatable { return true case (.directoryCreationFailed, .directoryCreationFailed): return true + case (.failedToCreateSnapshotDatabase, .failedToCreateSnapshotDatabase): + return true + case (.moveFailed, .moveFailed): + return true default: return false } } } -