nostrdb/blocks: write note blocks on ingest
When ingesting notes, parse text/longform contents and store them in nostrdb. Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
committed by
Daniel D’Aquino
parent
088683696a
commit
d063362bd7
@@ -10,6 +10,7 @@
|
|||||||
#include "lmdb.h"
|
#include "lmdb.h"
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
#include "cpu.h"
|
#include "cpu.h"
|
||||||
|
#include "block.h"
|
||||||
#include "threadpool.h"
|
#include "threadpool.h"
|
||||||
#include "protected_queue.h"
|
#include "protected_queue.h"
|
||||||
#include "memchr.h"
|
#include "memchr.h"
|
||||||
@@ -127,6 +128,7 @@ enum ndb_writer_msgtype {
|
|||||||
NDB_WRITER_PROFILE, // write a profile to the db
|
NDB_WRITER_PROFILE, // write a profile to the db
|
||||||
NDB_WRITER_DBMETA, // write ndb metadata
|
NDB_WRITER_DBMETA, // write ndb metadata
|
||||||
NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched
|
NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched
|
||||||
|
NDB_WRITER_BLOCKS, // write parsed note blocks
|
||||||
};
|
};
|
||||||
|
|
||||||
// keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
|
// keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
|
||||||
@@ -1141,6 +1143,12 @@ struct ndb_writer_last_fetch {
|
|||||||
uint64_t fetched_at;
|
uint64_t fetched_at;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// write note blocks
|
||||||
|
struct ndb_writer_blocks {
|
||||||
|
struct ndb_blocks *blocks;
|
||||||
|
uint64_t note_key;
|
||||||
|
};
|
||||||
|
|
||||||
// The different types of messages that the writer thread can write to the
|
// The different types of messages that the writer thread can write to the
|
||||||
// database
|
// database
|
||||||
struct ndb_writer_msg {
|
struct ndb_writer_msg {
|
||||||
@@ -1150,6 +1158,7 @@ struct ndb_writer_msg {
|
|||||||
struct ndb_writer_profile profile;
|
struct ndb_writer_profile profile;
|
||||||
struct ndb_writer_ndb_meta ndb_meta;
|
struct ndb_writer_ndb_meta ndb_meta;
|
||||||
struct ndb_writer_last_fetch last_fetch;
|
struct ndb_writer_last_fetch last_fetch;
|
||||||
|
struct ndb_writer_blocks blocks;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -2650,8 +2659,48 @@ cont:
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ndb_write_blocks(struct ndb_txn *txn, uint64_t note_key,
|
||||||
|
struct ndb_blocks *blocks)
|
||||||
|
{
|
||||||
|
int rc;
|
||||||
|
MDB_val key, val;
|
||||||
|
|
||||||
|
key.mv_data = ¬e_key;
|
||||||
|
key.mv_size = sizeof(note_key);
|
||||||
|
val.mv_data = blocks;
|
||||||
|
val.mv_size = ndb_blocks_total_size(blocks);
|
||||||
|
assert((val.mv_size % 8) == 0);
|
||||||
|
|
||||||
|
if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_BLOCKS], &key, &val, 0))) {
|
||||||
|
ndb_debug("write version to note_blocks failed: %s\n",
|
||||||
|
mdb_strerror(rc));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note,
|
||||||
|
uint64_t note_key, unsigned char *scratch,
|
||||||
|
size_t scratch_size)
|
||||||
|
{
|
||||||
|
size_t content_len;
|
||||||
|
const char *content;
|
||||||
|
struct ndb_blocks *blocks;
|
||||||
|
|
||||||
|
content_len = ndb_note_content_length(note);
|
||||||
|
content = ndb_note_content(note);
|
||||||
|
|
||||||
|
if (!ndb_parse_content(scratch, scratch_size, content, content_len, &blocks)) {
|
||||||
|
ndb_debug("failed to parse content '%.*s'\n", content_len, content);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
ndb_write_blocks(txn, note_key, blocks);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
static uint64_t ndb_write_note(struct ndb_txn *txn,
|
static uint64_t ndb_write_note(struct ndb_txn *txn,
|
||||||
struct ndb_writer_note *note)
|
struct ndb_writer_note *note,
|
||||||
|
unsigned char *scratch, size_t scratch_size)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
uint64_t note_key;
|
uint64_t note_key;
|
||||||
@@ -2687,10 +2736,14 @@ static uint64_t ndb_write_note(struct ndb_txn *txn,
|
|||||||
if (!ndb_write_note_kind_index(txn, note->note, note_key))
|
if (!ndb_write_note_kind_index(txn, note->note, note_key))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
// only do fulltext index on text and longform notes
|
// only parse content and do fulltext index on text and longform notes
|
||||||
if (note->note->kind == 1 || note->note->kind == 30023) {
|
if (note->note->kind == 1 || note->note->kind == 30023) {
|
||||||
if (!ndb_write_note_fulltext_index(txn, note->note, note_key))
|
if (!ndb_write_note_fulltext_index(txn, note->note, note_key))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
// write note blocks
|
||||||
|
ndb_write_new_blocks(txn, note->note, note_key, scratch,
|
||||||
|
scratch_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (note->note->kind == 7) {
|
if (note->note->kind == 7) {
|
||||||
@@ -2727,6 +2780,9 @@ static void *ndb_writer_thread(void *data)
|
|||||||
{
|
{
|
||||||
struct ndb_writer *writer = data;
|
struct ndb_writer *writer = data;
|
||||||
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
|
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
|
||||||
|
// 8mb scratch buffer for parsing note content
|
||||||
|
size_t scratch_size = 8 * 1024 * 1024;
|
||||||
|
unsigned char *scratch = malloc(scratch_size);
|
||||||
int i, popped, done, any_note;
|
int i, popped, done, any_note;
|
||||||
uint64_t note_nkey;
|
uint64_t note_nkey;
|
||||||
MDB_txn *mdb_txn = NULL;
|
MDB_txn *mdb_txn = NULL;
|
||||||
@@ -2747,6 +2803,7 @@ static void *ndb_writer_thread(void *data)
|
|||||||
case NDB_WRITER_PROFILE: any_note = 1; break;
|
case NDB_WRITER_PROFILE: any_note = 1; break;
|
||||||
case NDB_WRITER_DBMETA: any_note = 1; break;
|
case NDB_WRITER_DBMETA: any_note = 1; break;
|
||||||
case NDB_WRITER_PROFILE_LAST_FETCH: any_note = 1; break;
|
case NDB_WRITER_PROFILE_LAST_FETCH: any_note = 1; break;
|
||||||
|
case NDB_WRITER_BLOCKS: any_note = 1; break;
|
||||||
case NDB_WRITER_QUIT: break;
|
case NDB_WRITER_QUIT: break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2769,7 +2826,8 @@ static void *ndb_writer_thread(void *data)
|
|||||||
continue;
|
continue;
|
||||||
case NDB_WRITER_PROFILE:
|
case NDB_WRITER_PROFILE:
|
||||||
note_nkey =
|
note_nkey =
|
||||||
ndb_write_note(&txn, &msg->note);
|
ndb_write_note(&txn, &msg->note,
|
||||||
|
scratch, scratch_size);
|
||||||
if (msg->profile.record.builder) {
|
if (msg->profile.record.builder) {
|
||||||
// only write if parsing didn't fail
|
// only write if parsing didn't fail
|
||||||
ndb_write_profile(&txn, &msg->profile,
|
ndb_write_profile(&txn, &msg->profile,
|
||||||
@@ -2777,7 +2835,8 @@ static void *ndb_writer_thread(void *data)
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case NDB_WRITER_NOTE:
|
case NDB_WRITER_NOTE:
|
||||||
ndb_write_note(&txn, &msg->note);
|
ndb_write_note(&txn, &msg->note, scratch,
|
||||||
|
scratch_size);
|
||||||
//printf("wrote note ");
|
//printf("wrote note ");
|
||||||
//print_hex(msg->note.note->id, 32);
|
//print_hex(msg->note.note->id, 32);
|
||||||
//printf("\n");
|
//printf("\n");
|
||||||
@@ -2785,6 +2844,10 @@ static void *ndb_writer_thread(void *data)
|
|||||||
case NDB_WRITER_DBMETA:
|
case NDB_WRITER_DBMETA:
|
||||||
ndb_write_version(&txn, msg->ndb_meta.version);
|
ndb_write_version(&txn, msg->ndb_meta.version);
|
||||||
break;
|
break;
|
||||||
|
case NDB_WRITER_BLOCKS:
|
||||||
|
ndb_write_blocks(&txn, msg->blocks.note_key,
|
||||||
|
msg->blocks.blocks);
|
||||||
|
break;
|
||||||
case NDB_WRITER_PROFILE_LAST_FETCH:
|
case NDB_WRITER_PROFILE_LAST_FETCH:
|
||||||
ndb_writer_last_profile_fetch(&txn,
|
ndb_writer_last_profile_fetch(&txn,
|
||||||
msg->last_fetch.pubkey,
|
msg->last_fetch.pubkey,
|
||||||
@@ -2803,15 +2866,18 @@ static void *ndb_writer_thread(void *data)
|
|||||||
// free notes
|
// free notes
|
||||||
for (i = 0; i < popped; i++) {
|
for (i = 0; i < popped; i++) {
|
||||||
msg = &msgs[i];
|
msg = &msgs[i];
|
||||||
if (msg->type == NDB_WRITER_NOTE)
|
if (msg->type == NDB_WRITER_NOTE) {
|
||||||
free(msg->note.note);
|
free(msg->note.note);
|
||||||
else if (msg->type == NDB_WRITER_PROFILE) {
|
} else if (msg->type == NDB_WRITER_PROFILE) {
|
||||||
free(msg->profile.note.note);
|
free(msg->profile.note.note);
|
||||||
ndb_profile_record_builder_free(&msg->profile.record);
|
ndb_profile_record_builder_free(&msg->profile.record);
|
||||||
|
} else if (msg->type == NDB_WRITER_BLOCKS) {
|
||||||
|
ndb_blocks_free(msg->blocks.blocks);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
free(scratch);
|
||||||
ndb_debug("quitting writer thread\n");
|
ndb_debug("quitting writer thread\n");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -3059,24 +3125,30 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map
|
|||||||
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare);
|
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_ID], ndb_tsid_compare);
|
||||||
|
|
||||||
if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) {
|
if ((rc = mdb_dbi_open(txn, "profile_pk", tsid_flags, &lmdb->dbs[NDB_DB_PROFILE_PK]))) {
|
||||||
fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
|
fprintf(stderr, "mdb_dbi_open profile_pk failed: %s\n", mdb_strerror(rc));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare);
|
mdb_set_compare(txn, lmdb->dbs[NDB_DB_PROFILE_PK], ndb_tsid_compare);
|
||||||
|
|
||||||
if ((rc = mdb_dbi_open(txn, "note_kind", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_KIND]))) {
|
if ((rc = mdb_dbi_open(txn, "note_kind", tsid_flags, &lmdb->dbs[NDB_DB_NOTE_KIND]))) {
|
||||||
fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
|
fprintf(stderr, "mdb_dbi_open note_kind failed: %s\n", mdb_strerror(rc));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_tsid_compare);
|
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_KIND], ndb_u64_tsid_compare);
|
||||||
|
|
||||||
if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT,
|
if ((rc = mdb_dbi_open(txn, "note_text", MDB_CREATE | MDB_DUPSORT,
|
||||||
&lmdb->dbs[NDB_DB_NOTE_TEXT]))) {
|
&lmdb->dbs[NDB_DB_NOTE_TEXT]))) {
|
||||||
fprintf(stderr, "mdb_dbi_open id failed: %s\n", mdb_strerror(rc));
|
fprintf(stderr, "mdb_dbi_open note_text failed: %s\n", mdb_strerror(rc));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare);
|
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TEXT], ndb_text_search_key_compare);
|
||||||
|
|
||||||
|
if ((rc = mdb_dbi_open(txn, "note_blocks", MDB_CREATE | MDB_INTEGERKEY,
|
||||||
|
&lmdb->dbs[NDB_DB_NOTE_BLOCKS]))) {
|
||||||
|
fprintf(stderr, "mdb_dbi_open note_blocks failed: %s\n", mdb_strerror(rc));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// Commit the transaction
|
// Commit the transaction
|
||||||
if ((rc = mdb_txn_commit(txn))) {
|
if ((rc = mdb_txn_commit(txn))) {
|
||||||
fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc);
|
fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc);
|
||||||
@@ -4590,9 +4662,85 @@ const char *ndb_db_name(enum ndb_dbs db)
|
|||||||
return "note_kind_index";
|
return "note_kind_index";
|
||||||
case NDB_DB_NOTE_TEXT:
|
case NDB_DB_NOTE_TEXT:
|
||||||
return "note_fulltext";
|
return "note_fulltext";
|
||||||
|
case NDB_DB_NOTE_BLOCKS:
|
||||||
|
return "note_blocks";
|
||||||
case NDB_DBS:
|
case NDB_DBS:
|
||||||
return "count";
|
return "count";
|
||||||
}
|
}
|
||||||
|
|
||||||
return "unknown";
|
return "unknown";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static struct ndb_blocks *ndb_note_to_blocks(struct ndb_note *note)
|
||||||
|
{
|
||||||
|
const char *content;
|
||||||
|
size_t content_len;
|
||||||
|
struct ndb_blocks *blocks;
|
||||||
|
|
||||||
|
content = ndb_note_content(note);
|
||||||
|
content_len = ndb_note_content_length(note);
|
||||||
|
|
||||||
|
// something weird is going on
|
||||||
|
if (content_len >= INT32_MAX)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
unsigned char *buffer = malloc(content_len);
|
||||||
|
if (!buffer)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
if (!ndb_parse_content(buffer, content_len, content, content_len, &blocks)) {
|
||||||
|
free(buffer);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
blocks = realloc(blocks, ndb_blocks_total_size(blocks));
|
||||||
|
if (blocks == NULL)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
blocks->flags |= NDB_BLOCK_FLAG_OWNED;
|
||||||
|
|
||||||
|
return blocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key)
|
||||||
|
{
|
||||||
|
struct ndb_blocks *blocks, *blocks_to_writer;
|
||||||
|
size_t blocks_size;
|
||||||
|
struct ndb_note *note;
|
||||||
|
size_t note_len;
|
||||||
|
|
||||||
|
if ((blocks = ndb_lookup_by_key(txn, note_key, NDB_DB_NOTE_BLOCKS, ¬e_len))) {
|
||||||
|
return blocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we don't have note blocks, let's lazily generate them. This is
|
||||||
|
// migration-friendly instead of doing them all at once
|
||||||
|
if (!(note = ndb_get_note_by_key(txn, note_key, ¬e_len))) {
|
||||||
|
// no note found, can't return note blocks
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(blocks = ndb_note_to_blocks(note)))
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
// send a copy to the writer
|
||||||
|
blocks_size = ndb_blocks_total_size(blocks);
|
||||||
|
blocks_to_writer = malloc(blocks_size);
|
||||||
|
memcpy(blocks_to_writer, blocks, blocks_size);
|
||||||
|
assert(blocks->flags & NDB_BLOCK_FLAG_OWNED);
|
||||||
|
|
||||||
|
// we generated new blocks, let's store them in the DB
|
||||||
|
struct ndb_writer_blocks write_blocks = {
|
||||||
|
.blocks = blocks_to_writer,
|
||||||
|
.note_key = note_key
|
||||||
|
};
|
||||||
|
|
||||||
|
assert(write_blocks.blocks != blocks);
|
||||||
|
|
||||||
|
struct ndb_writer_msg msg = { .type = NDB_WRITER_BLOCKS };
|
||||||
|
msg.blocks = write_blocks;
|
||||||
|
|
||||||
|
ndb_writer_queue_msg(&ndb->writer, &msg);
|
||||||
|
|
||||||
|
return blocks;
|
||||||
|
}
|
||||||
|
|||||||
@@ -168,6 +168,7 @@ enum ndb_dbs {
|
|||||||
NDB_DB_PROFILE_LAST_FETCH,
|
NDB_DB_PROFILE_LAST_FETCH,
|
||||||
NDB_DB_NOTE_KIND, // note kind index
|
NDB_DB_NOTE_KIND, // note kind index
|
||||||
NDB_DB_NOTE_TEXT, // note fulltext index
|
NDB_DB_NOTE_TEXT, // note fulltext index
|
||||||
|
NDB_DB_NOTE_BLOCKS, // parsed note blocks for rendering
|
||||||
NDB_DBS,
|
NDB_DBS,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -474,6 +475,8 @@ size_t ndb_blocks_total_size(struct ndb_blocks *blocks);
|
|||||||
/// Free blocks if they are owned, safe to call on unowned blocks as well.
|
/// Free blocks if they are owned, safe to call on unowned blocks as well.
|
||||||
void ndb_blocks_free(struct ndb_blocks *blocks);
|
void ndb_blocks_free(struct ndb_blocks *blocks);
|
||||||
|
|
||||||
|
// BLOCK DB
|
||||||
|
struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, uint64_t note_key);
|
||||||
|
|
||||||
// BLOCK ITERATORS
|
// BLOCK ITERATORS
|
||||||
struct ndb_block_iterator *ndb_blocks_iterate_start(const char *, struct ndb_blocks *);
|
struct ndb_block_iterator *ndb_blocks_iterate_start(const char *, struct ndb_blocks *);
|
||||||
|
|||||||
Reference in New Issue
Block a user