nostrdb: migrations: make migrations asyncronous
This also seems to fix some issues with older migrations. Fixes: https://github.com/damus-io/nostrdb/issues/58 Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
committed by
Daniel D’Aquino
parent
05baba9c03
commit
289a8e262a
@@ -57,7 +57,7 @@ static const int DEFAULT_QUEUE_SIZE = 32768;
|
||||
#define NDB_PARSED_TAGS (1 << 6)
|
||||
#define NDB_PARSED_ALL (NDB_PARSED_ID|NDB_PARSED_PUBKEY|NDB_PARSED_SIG|NDB_PARSED_CREATED_AT|NDB_PARSED_KIND|NDB_PARSED_CONTENT|NDB_PARSED_TAGS)
|
||||
|
||||
typedef int (*ndb_migrate_fn)(struct ndb *);
|
||||
typedef int (*ndb_migrate_fn)(struct ndb_txn *);
|
||||
typedef int (*ndb_word_parser_fn)(void *, const char *word, int word_len,
|
||||
int word_index);
|
||||
|
||||
@@ -135,6 +135,7 @@ enum ndb_writer_msgtype {
|
||||
NDB_WRITER_DBMETA, // write ndb metadata
|
||||
NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched
|
||||
NDB_WRITER_BLOCKS, // write parsed note blocks
|
||||
NDB_WRITER_MIGRATE, // migrate the database
|
||||
};
|
||||
|
||||
// keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
|
||||
@@ -1589,47 +1590,33 @@ cleanup:
|
||||
//
|
||||
|
||||
// This was before we had note_profile_pubkey{,_kind} indices. Let's create them.
|
||||
static int ndb_migrate_profile_indices(struct ndb *ndb)
|
||||
static int ndb_migrate_profile_indices(struct ndb_txn *txn)
|
||||
{
|
||||
struct ndb_txn txn;
|
||||
int count;
|
||||
|
||||
if (!ndb_begin_rw_query(ndb, &txn)) {
|
||||
fprintf(stderr, "ndb_migrate_profile_indices: ndb_begin_rw_query failed\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
enum ndb_dbs indices[] = {NDB_DB_NOTE_PUBKEY, NDB_DB_NOTE_PUBKEY_KIND};
|
||||
if ((count = ndb_rebuild_note_indices(&txn, indices, 2)) != -1) {
|
||||
if ((count = ndb_rebuild_note_indices(txn, indices, 2)) != -1) {
|
||||
fprintf(stderr, "migrated %d notes to have pubkey and pubkey_kind indices\n", count);
|
||||
ndb_end_query(&txn);
|
||||
return 1;
|
||||
} else {
|
||||
fprintf(stderr, "error migrating notes to have pubkey and pubkey_kind indices, aborting.\n");
|
||||
mdb_txn_abort(txn.mdb_txn);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static int ndb_migrate_user_search_indices(struct ndb *ndb)
|
||||
static int ndb_migrate_user_search_indices(struct ndb_txn *txn)
|
||||
{
|
||||
int rc;
|
||||
MDB_cursor *cur;
|
||||
MDB_val k, v;
|
||||
void *profile_root;
|
||||
NdbProfileRecord_table_t record;
|
||||
struct ndb_txn txn;
|
||||
struct ndb_note *note;
|
||||
uint64_t note_key, profile_key;
|
||||
size_t len;
|
||||
int count;
|
||||
|
||||
if (!ndb_begin_rw_query(ndb, &txn)) {
|
||||
fprintf(stderr, "ndb_migrate_user_search_indices: ndb_begin_rw_query failed\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) {
|
||||
if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) {
|
||||
fprintf(stderr, "ndb_migrate_user_search_indices: mdb_cursor_open failed, error %d\n", rc);
|
||||
return 0;
|
||||
}
|
||||
@@ -1642,18 +1629,16 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb)
|
||||
profile_key = *((uint64_t*)k.mv_data);
|
||||
record = NdbProfileRecord_as_root(profile_root);
|
||||
note_key = NdbProfileRecord_note_key(record);
|
||||
note = ndb_get_note_by_key(&txn, note_key, &len);
|
||||
note = ndb_get_note_by_key(txn, note_key, &len);
|
||||
|
||||
if (note == NULL) {
|
||||
fprintf(stderr, "ndb_migrate_user_search_indices: note lookup failed\n");
|
||||
return 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!ndb_write_profile_search_indices(&txn, note, profile_key,
|
||||
if (!ndb_write_profile_search_indices(txn, note, profile_key,
|
||||
profile_root)) {
|
||||
|
||||
fprintf(stderr, "ndb_migrate_user_search_indices: ndb_write_profile_search_indices failed\n");
|
||||
return 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
count++;
|
||||
@@ -1663,51 +1648,33 @@ static int ndb_migrate_user_search_indices(struct ndb *ndb)
|
||||
|
||||
mdb_cursor_close(cur);
|
||||
|
||||
ndb_end_query(&txn);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int ndb_migrate_lower_user_search_indices(struct ndb *ndb)
|
||||
static int ndb_migrate_lower_user_search_indices(struct ndb_txn *txn)
|
||||
{
|
||||
MDB_txn *txn;
|
||||
|
||||
if (mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn)) {
|
||||
fprintf(stderr, "ndb_migrate_lower_user_search_indices: ndb_txn_begin failed\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
// just drop the search db so we can rebuild it
|
||||
if (mdb_drop(txn, ndb->lmdb.dbs[NDB_DB_PROFILE_SEARCH], 0)) {
|
||||
if (mdb_drop(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE_SEARCH], 0)) {
|
||||
fprintf(stderr, "ndb_migrate_lower_user_search_indices: mdb_drop failed\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
mdb_txn_commit(txn);
|
||||
|
||||
return ndb_migrate_user_search_indices(ndb);
|
||||
return ndb_migrate_user_search_indices(txn);
|
||||
}
|
||||
|
||||
int ndb_process_profile_note(struct ndb_note *note, struct ndb_profile_record_builder *profile);
|
||||
|
||||
|
||||
int ndb_db_version(struct ndb *ndb)
|
||||
int ndb_db_version(struct ndb_txn *txn)
|
||||
{
|
||||
int rc;
|
||||
uint64_t version, version_key;
|
||||
MDB_val k, v;
|
||||
MDB_txn *txn;
|
||||
|
||||
version_key = NDB_META_KEY_VERSION;
|
||||
k.mv_data = &version_key;
|
||||
k.mv_size = sizeof(version_key);
|
||||
|
||||
if ((rc = mdb_txn_begin(ndb->lmdb.env, NULL, 0, &txn))) {
|
||||
fprintf(stderr, "ndb_db_version: mdb_txn_begin failed, error %d\n", rc);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mdb_get(txn, ndb->lmdb.dbs[NDB_DB_NDB_META], &k, &v)) {
|
||||
if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &k, &v)) {
|
||||
version = -1;
|
||||
} else {
|
||||
if (v.mv_size != 8) {
|
||||
@@ -1717,7 +1684,6 @@ int ndb_db_version(struct ndb *ndb)
|
||||
version = *((uint64_t*)v.mv_data);
|
||||
}
|
||||
|
||||
mdb_txn_abort(txn);
|
||||
return version;
|
||||
}
|
||||
|
||||
@@ -1857,30 +1823,29 @@ static inline int ndb_writer_queue_msg(struct ndb_writer *writer,
|
||||
return prot_queue_push(&writer->inbox, msg);
|
||||
}
|
||||
|
||||
static int ndb_migrate_utf8_profile_names(struct ndb *ndb)
|
||||
static uint64_t ndb_write_note_and_profile(struct ndb_txn *txn, struct ndb_writer_profile *profile, unsigned char *scratch, size_t scratch_size, uint32_t ndb_flags);
|
||||
static int ndb_migrate_utf8_profile_names(struct ndb_txn *txn)
|
||||
{
|
||||
int rc;
|
||||
MDB_cursor *cur;
|
||||
MDB_val k, v;
|
||||
void *profile_root;
|
||||
NdbProfileRecord_table_t record;
|
||||
struct ndb_txn txn;
|
||||
struct ndb_note *note, *copied_note;
|
||||
uint64_t note_key;
|
||||
size_t len;
|
||||
int count, failed;
|
||||
struct ndb_writer_msg out;
|
||||
int count, failed, ret;
|
||||
struct ndb_writer_profile profile;
|
||||
|
||||
if (!ndb_begin_rw_query(ndb, &txn)) {
|
||||
fprintf(stderr, "ndb_migrate_utf8_profile_names: ndb_begin_rw_query failed\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ((rc = mdb_cursor_open(txn.mdb_txn, ndb->lmdb.dbs[NDB_DB_PROFILE], &cur))) {
|
||||
if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_PROFILE], &cur))) {
|
||||
fprintf(stderr, "ndb_migrate_utf8_profile_names: mdb_cursor_open failed, error %d\n", rc);
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t scratch_size = 8 * 1024 * 1024;
|
||||
unsigned char *scratch = malloc(scratch_size);
|
||||
|
||||
ret = 1;
|
||||
count = 0;
|
||||
failed = 0;
|
||||
|
||||
@@ -1889,14 +1854,14 @@ static int ndb_migrate_utf8_profile_names(struct ndb *ndb)
|
||||
profile_root = v.mv_data;
|
||||
record = NdbProfileRecord_as_root(profile_root);
|
||||
note_key = NdbProfileRecord_note_key(record);
|
||||
note = ndb_get_note_by_key(&txn, note_key, &len);
|
||||
note = ndb_get_note_by_key(txn, note_key, &len);
|
||||
|
||||
if (note == NULL) {
|
||||
fprintf(stderr, "ndb_migrate_utf8_profile_names: note lookup failed\n");
|
||||
return 0;
|
||||
failed++;
|
||||
continue;
|
||||
}
|
||||
|
||||
struct ndb_profile_record_builder *b = &out.profile.record;
|
||||
struct ndb_profile_record_builder *b = &profile.record;
|
||||
|
||||
// reprocess profile
|
||||
if (!ndb_process_profile_note(note, b)) {
|
||||
@@ -1908,13 +1873,14 @@ static int ndb_migrate_utf8_profile_names(struct ndb *ndb)
|
||||
copied_note = malloc(len);
|
||||
memcpy(copied_note, note, len);
|
||||
|
||||
out.type = NDB_WRITER_PROFILE;
|
||||
out.profile.note.note = copied_note;
|
||||
out.profile.note.note_len = len;
|
||||
profile.note.note = copied_note;
|
||||
profile.note.note_len = len;
|
||||
|
||||
ndb_writer_queue_msg(&ndb->writer, &out);
|
||||
|
||||
count++;
|
||||
// we don't pass in flags when migrating... a bit sketchy but
|
||||
// whatever. noone is using this to customize nostrdb atm
|
||||
if (ndb_write_note_and_profile(txn, &profile, scratch, scratch_size, 0)) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
fprintf(stderr, "migrated %d profiles to fix utf8 profile names\n", count);
|
||||
@@ -1923,11 +1889,10 @@ static int ndb_migrate_utf8_profile_names(struct ndb *ndb)
|
||||
fprintf(stderr, "failed to migrate %d profiles to fix utf8 profile names\n", failed);
|
||||
}
|
||||
|
||||
free(scratch);
|
||||
mdb_cursor_close(cur);
|
||||
|
||||
ndb_end_query(&txn);
|
||||
|
||||
return 1;
|
||||
return ret;
|
||||
}
|
||||
|
||||
static struct ndb_migration MIGRATIONS[] = {
|
||||
@@ -4227,29 +4192,6 @@ static uint64_t ndb_write_note(struct ndb_txn *txn,
|
||||
return note_key;
|
||||
}
|
||||
|
||||
// only to be called from the writer thread
|
||||
static void ndb_write_version(struct ndb_txn *txn, uint64_t version)
|
||||
{
|
||||
int rc;
|
||||
MDB_val key, val;
|
||||
uint64_t version_key;
|
||||
|
||||
version_key = NDB_META_KEY_VERSION;
|
||||
|
||||
key.mv_data = &version_key;
|
||||
key.mv_size = sizeof(version_key);
|
||||
val.mv_data = &version;
|
||||
val.mv_size = sizeof(version);
|
||||
|
||||
if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) {
|
||||
ndb_debug("write version to ndb_meta failed: %s\n",
|
||||
mdb_strerror(rc));
|
||||
return;
|
||||
}
|
||||
|
||||
//fprintf(stderr, "writing version %" PRIu64 "\n", version);
|
||||
}
|
||||
|
||||
static void ndb_monitor_lock(struct ndb_monitor *mon) {
|
||||
pthread_mutex_lock(&mon->mutex);
|
||||
}
|
||||
@@ -4311,6 +4253,93 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
|
||||
ndb_monitor_unlock(monitor);
|
||||
}
|
||||
|
||||
uint64_t ndb_write_note_and_profile(
|
||||
struct ndb_txn *txn,
|
||||
struct ndb_writer_profile *profile,
|
||||
unsigned char *scratch,
|
||||
size_t scratch_size,
|
||||
uint32_t ndb_flags)
|
||||
{
|
||||
uint64_t note_nkey;
|
||||
|
||||
note_nkey = ndb_write_note(txn, &profile->note, scratch, scratch_size, ndb_flags);
|
||||
|
||||
if (profile->record.builder) {
|
||||
// only write if parsing didn't fail
|
||||
ndb_write_profile(txn, profile, note_nkey);
|
||||
}
|
||||
|
||||
return note_nkey;
|
||||
}
|
||||
|
||||
// only to be called from the writer thread
|
||||
static int ndb_write_version(struct ndb_txn *txn, uint64_t version)
|
||||
{
|
||||
int rc;
|
||||
MDB_val key, val;
|
||||
uint64_t version_key;
|
||||
|
||||
version_key = NDB_META_KEY_VERSION;
|
||||
|
||||
key.mv_data = &version_key;
|
||||
key.mv_size = sizeof(version_key);
|
||||
val.mv_data = &version;
|
||||
val.mv_size = sizeof(version);
|
||||
|
||||
if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NDB_META], &key, &val, 0))) {
|
||||
ndb_debug("write version to ndb_meta failed: %s\n",
|
||||
mdb_strerror(rc));
|
||||
return 0;
|
||||
}
|
||||
|
||||
//fprintf(stderr, "writing version %" PRIu64 "\n", version);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
static int ndb_run_migrations(struct ndb_txn *txn)
|
||||
{
|
||||
int64_t version, latest_version, i;
|
||||
|
||||
latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]);
|
||||
|
||||
if ((version = ndb_db_version(txn)) == -1) {
|
||||
ndb_debug("run_migrations: no version found, assuming new db\n");
|
||||
version = latest_version;
|
||||
|
||||
// no version found. fresh db?
|
||||
if (!ndb_write_version(txn, version)) {
|
||||
fprintf(stderr, "run_migrations: failed writing db version");
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
} else {
|
||||
ndb_debug("ndb: version %" PRIu64 " found\n", version);
|
||||
}
|
||||
|
||||
if (version < latest_version)
|
||||
fprintf(stderr, "nostrdb: migrating v%d -> v%d\n",
|
||||
(int)version, (int)latest_version);
|
||||
|
||||
for (i = version; i < latest_version; i++) {
|
||||
if (!MIGRATIONS[i].fn(txn)) {
|
||||
fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!ndb_write_version(txn, i+1)) {
|
||||
fprintf(stderr, "run_migrations: failed writing db version");
|
||||
return 0;
|
||||
}
|
||||
|
||||
version = i+1;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
static void *ndb_writer_thread(void *data)
|
||||
{
|
||||
ndb_debug("started writer thread\n");
|
||||
@@ -4346,6 +4375,7 @@ static void *ndb_writer_thread(void *data)
|
||||
case NDB_WRITER_DBMETA: needs_commit = 1; break;
|
||||
case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break;
|
||||
case NDB_WRITER_BLOCKS: needs_commit = 1; break;
|
||||
case NDB_WRITER_MIGRATE: needs_commit = 1; break;
|
||||
case NDB_WRITER_QUIT: break;
|
||||
}
|
||||
}
|
||||
@@ -4369,24 +4399,22 @@ static void *ndb_writer_thread(void *data)
|
||||
continue;
|
||||
case NDB_WRITER_PROFILE:
|
||||
note_nkey =
|
||||
ndb_write_note(&txn, &msg->note,
|
||||
scratch, scratch_size,
|
||||
writer->ndb_flags);
|
||||
ndb_write_note_and_profile(
|
||||
&txn,
|
||||
&msg->profile,
|
||||
scratch,
|
||||
scratch_size,
|
||||
writer->ndb_flags);
|
||||
|
||||
if (note_nkey > 0) {
|
||||
written_notes[num_notes++] =
|
||||
(struct written_note){
|
||||
.note_id = note_nkey,
|
||||
.note = &msg->note,
|
||||
.note = &msg->profile.note,
|
||||
};
|
||||
} else {
|
||||
ndb_debug("failed to write note\n");
|
||||
}
|
||||
if (msg->profile.record.builder) {
|
||||
// only write if parsing didn't fail
|
||||
ndb_write_profile(&txn, &msg->profile,
|
||||
note_nkey);
|
||||
}
|
||||
break;
|
||||
case NDB_WRITER_NOTE:
|
||||
note_nkey = ndb_write_note(&txn, &msg->note,
|
||||
scratch,
|
||||
@@ -4407,6 +4435,12 @@ static void *ndb_writer_thread(void *data)
|
||||
ndb_write_blocks(&txn, msg->blocks.note_key,
|
||||
msg->blocks.blocks);
|
||||
break;
|
||||
case NDB_WRITER_MIGRATE:
|
||||
if (!ndb_run_migrations(&txn)) {
|
||||
mdb_txn_abort(txn.mdb_txn);
|
||||
goto bail;
|
||||
}
|
||||
break;
|
||||
case NDB_WRITER_PROFILE_LAST_FETCH:
|
||||
ndb_writer_last_profile_fetch(&txn,
|
||||
msg->last_fetch.pubkey,
|
||||
@@ -4443,6 +4477,7 @@ static void *ndb_writer_thread(void *data)
|
||||
}
|
||||
}
|
||||
|
||||
bail:
|
||||
free(scratch);
|
||||
ndb_debug("quitting writer thread\n");
|
||||
return NULL;
|
||||
@@ -4753,50 +4788,6 @@ static int ndb_queue_write_version(struct ndb *ndb, uint64_t version)
|
||||
return ndb_writer_queue_msg(&ndb->writer, &msg);
|
||||
}
|
||||
|
||||
static int ndb_run_migrations(struct ndb *ndb)
|
||||
{
|
||||
int64_t version, latest_version, i;
|
||||
|
||||
latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]);
|
||||
|
||||
if ((version = ndb_db_version(ndb)) == -1) {
|
||||
ndb_debug("run_migrations: no version found, assuming new db\n");
|
||||
version = latest_version;
|
||||
|
||||
// no version found. fresh db?
|
||||
if (!ndb_queue_write_version(ndb, version)) {
|
||||
fprintf(stderr, "run_migrations: failed writing db version");
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
} else {
|
||||
ndb_debug("ndb: version %" PRIu64 " found\n", version);
|
||||
}
|
||||
|
||||
if (version < latest_version)
|
||||
fprintf(stderr, "nostrdb: migrating v%d -> v%d\n",
|
||||
(int)version, (int)latest_version);
|
||||
|
||||
for (i = version; i < latest_version; i++) {
|
||||
if (!MIGRATIONS[i].fn(ndb)) {
|
||||
fprintf(stderr, "run_migrations: migration v%d -> v%d failed\n", (int)i, (int)(i+1));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!ndb_queue_write_version(ndb, i+1)) {
|
||||
fprintf(stderr, "run_migrations: failed writing db version");
|
||||
return 0;
|
||||
}
|
||||
|
||||
version = i+1;
|
||||
}
|
||||
|
||||
ndb->version = version;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb,
|
||||
void *sub_cb_ctx)
|
||||
{
|
||||
@@ -4861,10 +4852,9 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) &&
|
||||
!ndb_run_migrations(ndb)) {
|
||||
fprintf(stderr, "failed to run migrations\n");
|
||||
return 0;
|
||||
if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE)) {
|
||||
struct ndb_writer_msg msg = { .type = NDB_WRITER_MIGRATE };
|
||||
ndb_writer_queue_msg(&ndb->writer, &msg);
|
||||
}
|
||||
|
||||
// Initialize LMDB environment and spin up threads
|
||||
|
||||
@@ -460,7 +460,7 @@ int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[3
|
||||
|
||||
// NDB
|
||||
int ndb_init(struct ndb **ndb, const char *dbdir, const struct ndb_config *);
|
||||
int ndb_db_version(struct ndb *ndb);
|
||||
int ndb_db_version(struct ndb_txn *txn);
|
||||
int ndb_process_event(struct ndb *, const char *json, int len);
|
||||
int ndb_process_events(struct ndb *, const char *ldjson, size_t len);
|
||||
#ifndef _WIN32
|
||||
|
||||
Reference in New Issue
Block a user