nostrdb: Initial relay index implementation
Add relay indexing for existing notes This patch introduces a relay index for new notes and notes that have already been stored, allowing the database to track additional relay sources for a given note. Changes: - Added `NDB_WRITER_NOTE_RELAY` to handle relay indexing separately from new note ingestion. - Implemented `ndb_write_note_relay()` and `ndb_write_note_relay_kind_index()` to store relay URLs. - Modified `ndb_ingester_process_event()` to check for existing notes and append relay info if necessary. - Introduced `ndb_note_has_relay()` to prevent duplicate relay entries. - Updated LMDB schema with `NDB_DB_NOTE_RELAYS` (note_id -> relay) and `NDB_DB_NOTE_RELAY_KIND` (relay + kind + created_at -> note). - Refactored `ndb_process_event()` to use `ndb_ingest_meta` for tracking relay sources. - Ensured proper memory management for relay strings in writer thread. With this change, nostrdb can better track where notes are seen across different relays, improving query capabilities for relay-based data retrieval. Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
committed by
Daniel D’Aquino
parent
fcd8131063
commit
684701931d
@@ -129,6 +129,8 @@ struct ndb_ingest_controller
|
||||
{
|
||||
MDB_txn *read_txn;
|
||||
struct ndb_lmdb *lmdb;
|
||||
struct ndb_note *note;
|
||||
uint64_t note_key;
|
||||
};
|
||||
|
||||
enum ndb_writer_msgtype {
|
||||
@@ -139,6 +141,7 @@ enum ndb_writer_msgtype {
|
||||
NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched
|
||||
NDB_WRITER_BLOCKS, // write parsed note blocks
|
||||
NDB_WRITER_MIGRATE, // migrate the database
|
||||
NDB_WRITER_NOTE_RELAY, // we already have the note, but we have more relays to write
|
||||
};
|
||||
|
||||
// keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
|
||||
@@ -1475,6 +1478,7 @@ static int ndb_db_is_index(enum ndb_dbs index)
|
||||
case NDB_DB_NDB_META:
|
||||
case NDB_DB_PROFILE_SEARCH:
|
||||
case NDB_DB_PROFILE_LAST_FETCH:
|
||||
case NDB_DB_NOTE_RELAYS:
|
||||
case NDB_DBS:
|
||||
return 0;
|
||||
case NDB_DB_PROFILE_PK:
|
||||
@@ -1484,6 +1488,7 @@ static int ndb_db_is_index(enum ndb_dbs index)
|
||||
case NDB_DB_NOTE_TAGS:
|
||||
case NDB_DB_NOTE_PUBKEY:
|
||||
case NDB_DB_NOTE_PUBKEY_KIND:
|
||||
case NDB_DB_NOTE_RELAY_KIND:
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -1499,6 +1504,125 @@ static inline void ndb_id_u64_ts_init(struct ndb_id_u64_ts *key,
|
||||
key->timestamp = timestamp;
|
||||
}
|
||||
|
||||
// formats the relay url buffer for the NDB_DB_NOTE_RELAYS value. It's a
|
||||
// null terminated string padded to 8 bytes (we must keep the entire database
|
||||
// aligned to 8 bytes at all times)
|
||||
static int prepare_relay_buf(char *relay_buf, int bufsize, const char *relay,
|
||||
int relay_len)
|
||||
{
|
||||
struct cursor cur;
|
||||
|
||||
// make sure the size of the buffer is aligned
|
||||
assert((bufsize % 8) == 0);
|
||||
|
||||
make_cursor((unsigned char *)relay_buf, (unsigned char *)relay_buf + bufsize, &cur);
|
||||
|
||||
// push the relay string
|
||||
if (!cursor_push(&cur, (unsigned char *)relay, relay_len))
|
||||
return 0;
|
||||
|
||||
// relay urls are null terminated for convenience
|
||||
if (!cursor_push_byte(&cur, 0))
|
||||
return 0;
|
||||
|
||||
// align the buffer
|
||||
if (!cursor_align(&cur, 8))
|
||||
return 0;
|
||||
|
||||
return cur.p - cur.start;
|
||||
}
|
||||
|
||||
// Write to the note_id -> relay_url database. This records where notes
|
||||
// have been seen
|
||||
static int ndb_write_note_relay(struct ndb_txn *txn, uint64_t note_key,
|
||||
const char *relay, int relay_len)
|
||||
{
|
||||
char relay_buf[256];
|
||||
int rc, len;
|
||||
MDB_val k, v;
|
||||
|
||||
if (relay == NULL || relay_len == 0)
|
||||
return 0;
|
||||
|
||||
if (!(len = prepare_relay_buf(relay_buf, sizeof(relay_buf), relay, relay_len))) {
|
||||
fprintf(stderr, "relay url '%s' too large when writing note relay index\n", relay);
|
||||
return 0;
|
||||
}
|
||||
|
||||
assert((len % 8) == 0);
|
||||
|
||||
k.mv_data = ¬e_key;
|
||||
k.mv_size = sizeof(note_key);
|
||||
|
||||
v.mv_data = relay_buf;
|
||||
v.mv_size = len;
|
||||
|
||||
// NODUPDATA is specified so that we don't accidently add duplicate
|
||||
// key/value pairs
|
||||
if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAYS],
|
||||
&k, &v, MDB_NODUPDATA)))
|
||||
{
|
||||
ndb_debug("ndb_write_note_relay failed: %s\n", mdb_strerror(rc));
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int ndb_write_note_relay_kind_index(struct ndb_txn *txn,
|
||||
uint64_t kind,
|
||||
uint64_t note_key,
|
||||
uint64_t created_at,
|
||||
const char *relay,
|
||||
int relay_len)
|
||||
{
|
||||
// The relay kind key has a layout like so
|
||||
//
|
||||
// - note_key: 00 + 8 bytes
|
||||
// - kind: 08 + 8 bytes
|
||||
// - created_at: 16 + 8 bytes
|
||||
// - relay_url_size: 24 + 1 byte
|
||||
// - relay_url: 25 + n byte null-terminated string
|
||||
// - pad to 8 byte alignment
|
||||
|
||||
unsigned char buf[256];
|
||||
int rc;
|
||||
struct cursor cur;
|
||||
MDB_val k, v;
|
||||
|
||||
// come on bro
|
||||
if (relay_len > 248)
|
||||
return 0;
|
||||
|
||||
if (relay == NULL || relay_len == 0)
|
||||
return 0;
|
||||
|
||||
make_cursor(buf, buf + sizeof(buf), &cur);
|
||||
|
||||
if (!cursor_push(&cur, (unsigned char *)¬e_key, 8)) return 0;
|
||||
if (!cursor_push(&cur, (unsigned char *)&kind, 8)) return 0;
|
||||
if (!cursor_push(&cur, (unsigned char *)&created_at, 8)) return 0;
|
||||
if (!cursor_push_byte(&cur, (uint8_t)relay_len)) return 0;
|
||||
if (!cursor_push(&cur, (unsigned char *)relay, relay_len)) return 0;
|
||||
if (!cursor_align(&cur, 8)) return 0;
|
||||
|
||||
assert(((cur.p-cur.start)%8) == 0);
|
||||
|
||||
k.mv_data = cur.start;
|
||||
k.mv_size = cur.p - cur.start;
|
||||
|
||||
v.mv_data = NULL;
|
||||
v.mv_size = 0;
|
||||
|
||||
if ((rc = mdb_put(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAY_KIND], &k, &v, 0))) {
|
||||
fprintf(stderr, "write note relay kind index failed: %s\n",
|
||||
mdb_strerror(rc));
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int ndb_write_note_pubkey_index(struct ndb_txn *txn, struct ndb_note *note,
|
||||
uint64_t note_key)
|
||||
{
|
||||
@@ -1601,6 +1725,7 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices,
|
||||
case NDB_DB_NDB_META:
|
||||
case NDB_DB_PROFILE_SEARCH:
|
||||
case NDB_DB_PROFILE_LAST_FETCH:
|
||||
case NDB_DB_NOTE_RELAYS:
|
||||
case NDB_DBS:
|
||||
// this should never happen since we check at
|
||||
// the start
|
||||
@@ -1620,6 +1745,9 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices,
|
||||
goto cleanup;
|
||||
}
|
||||
break;
|
||||
case NDB_DB_NOTE_RELAY_KIND:
|
||||
fprintf(stderr, "it doesn't make sense to rebuild note relay kind index\n");
|
||||
return 0;
|
||||
case NDB_DB_NOTE_PUBKEY_KIND:
|
||||
if (!ndb_write_note_pubkey_kind_index(txn, note, note_key)) {
|
||||
count = -1;
|
||||
@@ -1817,14 +1945,23 @@ enum ndb_ingester_msgtype {
|
||||
};
|
||||
|
||||
struct ndb_ingester_event {
|
||||
const char *relay;
|
||||
char *json;
|
||||
unsigned client : 1; // ["EVENT", {...}] messages
|
||||
unsigned len : 31;
|
||||
};
|
||||
|
||||
struct ndb_writer_note_relay {
|
||||
const char *relay;
|
||||
uint64_t note_key;
|
||||
uint64_t kind;
|
||||
uint64_t created_at;
|
||||
};
|
||||
|
||||
struct ndb_writer_note {
|
||||
struct ndb_note *note;
|
||||
size_t note_len;
|
||||
const char *relay;
|
||||
};
|
||||
|
||||
struct ndb_writer_profile {
|
||||
@@ -1862,6 +1999,7 @@ struct ndb_writer_blocks {
|
||||
struct ndb_writer_msg {
|
||||
enum ndb_writer_msgtype type;
|
||||
union {
|
||||
struct ndb_writer_note_relay note_relay;
|
||||
struct ndb_writer_note note;
|
||||
struct ndb_writer_profile profile;
|
||||
struct ndb_writer_ndb_meta ndb_meta;
|
||||
@@ -2075,6 +2213,7 @@ static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v)
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
// get some value based on a clustered id key
|
||||
int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id,
|
||||
MDB_val *val)
|
||||
@@ -2241,9 +2380,10 @@ static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid
|
||||
hex_decode(hexid, 64, id, sizeof(id));
|
||||
|
||||
// let's see if we already have it
|
||||
|
||||
ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn);
|
||||
if (!ndb_has_note(&txn, id))
|
||||
c->note = ndb_get_note_by_id(&txn, id, NULL, &c->note_key);
|
||||
|
||||
if (c->note == NULL)
|
||||
return NDB_IDRES_CONT;
|
||||
|
||||
return NDB_IDRES_STOP;
|
||||
@@ -2330,7 +2470,8 @@ int ndb_process_profile_note(struct ndb_note *note,
|
||||
}
|
||||
|
||||
static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
|
||||
char *json, unsigned len, unsigned client)
|
||||
char *json, unsigned len,
|
||||
unsigned client, const char *relay)
|
||||
{
|
||||
struct ndb_ingester_msg msg;
|
||||
msg.type = NDB_INGEST_EVENT;
|
||||
@@ -2338,14 +2479,22 @@ static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
|
||||
msg.event.json = json;
|
||||
msg.event.len = len;
|
||||
msg.event.client = client;
|
||||
msg.event.relay = relay;
|
||||
|
||||
return threadpool_dispatch(&ingester->tp, &msg);
|
||||
}
|
||||
|
||||
void ndb_ingest_meta_init(struct ndb_ingest_meta *meta, unsigned client, const char *relay)
|
||||
{
|
||||
meta->client = client;
|
||||
meta->relay = relay;
|
||||
}
|
||||
|
||||
static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json,
|
||||
int len, unsigned client)
|
||||
int len, struct ndb_ingest_meta *meta)
|
||||
{
|
||||
const char *relay = meta->relay;
|
||||
|
||||
// Without this, we get bus errors in the json parser inside when
|
||||
// trying to ingest empty kind 6 reposts... we should probably do fuzz
|
||||
// testing on inputs to the json parser
|
||||
@@ -2362,7 +2511,13 @@ static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json,
|
||||
if (json_copy == NULL)
|
||||
return 0;
|
||||
|
||||
return ndb_ingester_queue_event(ingester, json_copy, len, client);
|
||||
if (relay != NULL) {
|
||||
relay = strdup(meta->relay);
|
||||
if (relay == NULL)
|
||||
return 0;
|
||||
}
|
||||
|
||||
return ndb_ingester_queue_event(ingester, json_copy, len, meta->client, relay);
|
||||
}
|
||||
|
||||
|
||||
@@ -2370,9 +2525,12 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
|
||||
struct ndb_note *note,
|
||||
size_t note_size,
|
||||
struct ndb_writer_msg *out,
|
||||
struct ndb_ingester *ingester)
|
||||
struct ndb_ingester *ingester,
|
||||
const char *relay)
|
||||
{
|
||||
enum ndb_ingest_filter_action action;
|
||||
struct ndb_ingest_meta meta;
|
||||
|
||||
action = NDB_INGEST_ACCEPT;
|
||||
|
||||
if (ingester->filter)
|
||||
@@ -2412,24 +2570,81 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
|
||||
} else if (note->kind == 6) {
|
||||
// process the repost if we have a repost event
|
||||
ndb_debug("processing kind 6 repost\n");
|
||||
// dup the relay string
|
||||
ndb_ingest_meta_init(&meta, 0, relay);
|
||||
ndb_ingest_event(ingester, ndb_note_content(note),
|
||||
ndb_note_content_length(note), 0);
|
||||
ndb_note_content_length(note),
|
||||
&meta);
|
||||
}
|
||||
|
||||
out->type = NDB_WRITER_NOTE;
|
||||
out->note.note = note;
|
||||
out->note.note_len = note_size;
|
||||
out->note.relay = relay;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ndb_note_seen_on_relay(struct ndb_txn *txn, uint64_t note_key, const char *relay)
|
||||
{
|
||||
MDB_val k, v;
|
||||
MDB_cursor *cur;
|
||||
int rc, len;
|
||||
char relay_buf[256];
|
||||
|
||||
if (relay == NULL)
|
||||
return 0;
|
||||
|
||||
len = strlen(relay);
|
||||
|
||||
if (!(len = prepare_relay_buf(relay_buf, sizeof(relay_buf), relay, len)))
|
||||
return 0;
|
||||
|
||||
assert((len % 8) == 0);
|
||||
|
||||
k.mv_data = ¬e_key;
|
||||
k.mv_size = sizeof(note_key);
|
||||
|
||||
v.mv_data = relay_buf;
|
||||
v.mv_size = len;
|
||||
|
||||
if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAYS], &cur)) != MDB_SUCCESS)
|
||||
return 0;
|
||||
|
||||
rc = mdb_cursor_get(cur, &k, &v, MDB_GET_BOTH);
|
||||
mdb_cursor_close(cur);
|
||||
|
||||
return rc == MDB_SUCCESS;
|
||||
}
|
||||
|
||||
// process the relay for the note. this is called when we already have the
|
||||
// note in the database but still need to check if the relay needs to be
|
||||
// written to the relay indexes for corresponding note
|
||||
static int ndb_process_note_relay(struct ndb_txn *txn, struct ndb_writer_msg *out,
|
||||
uint64_t note_key, struct ndb_note *note,
|
||||
const char *relay)
|
||||
{
|
||||
// query to see if we already have the relay on this note
|
||||
if (ndb_note_seen_on_relay(txn, note_key, relay)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// if not, tell the writer thread to emit a NOTE_RELAY event
|
||||
out->type = NDB_WRITER_NOTE_RELAY;
|
||||
|
||||
out->note_relay.relay = relay;
|
||||
out->note_relay.note_key = note_key;
|
||||
out->note_relay.kind = ndb_note_kind(note);
|
||||
out->note_relay.created_at = ndb_note_created_at(note);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int ndb_ingester_process_event(secp256k1_context *ctx,
|
||||
struct ndb_ingester *ingester,
|
||||
struct ndb_ingester_event *ev,
|
||||
struct ndb_writer_msg *out,
|
||||
MDB_txn *read_txn
|
||||
)
|
||||
MDB_txn *read_txn)
|
||||
{
|
||||
struct ndb_tce tce;
|
||||
struct ndb_fce fce;
|
||||
@@ -2463,10 +2678,29 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
|
||||
ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) :
|
||||
ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb);
|
||||
|
||||
// This is a result from our special json parser. It parsed the id
|
||||
// and found that we already have it in the database
|
||||
if ((int)note_size == -42) {
|
||||
// we already have this!
|
||||
//ndb_debug("already have id??\n");
|
||||
goto cleanup;
|
||||
assert(controller.note != NULL);
|
||||
assert(controller.note_key != 0);
|
||||
struct ndb_txn txn;
|
||||
ndb_txn_from_mdb(&txn, ingester->lmdb, read_txn);
|
||||
|
||||
// we still need to process the relays on the note even
|
||||
// if we already have it
|
||||
if (ev->relay && ndb_process_note_relay(&txn, out,
|
||||
controller.note_key,
|
||||
controller.note,
|
||||
ev->relay))
|
||||
{
|
||||
// free note buf here since we don't pass the note to the writer thread
|
||||
free(buf);
|
||||
goto success;
|
||||
} else {
|
||||
// we already have the note and there are no new
|
||||
// relays to process. nothing to write.
|
||||
goto cleanup;
|
||||
}
|
||||
} else if (note_size == 0) {
|
||||
ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json);
|
||||
goto cleanup;
|
||||
@@ -2484,13 +2718,12 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
|
||||
}
|
||||
|
||||
if (!ndb_ingester_process_note(ctx, note, note_size,
|
||||
out, ingester)) {
|
||||
out, ingester,
|
||||
ev->relay)) {
|
||||
ndb_debug("failed to process note\n");
|
||||
goto cleanup;
|
||||
} else {
|
||||
// we're done with the original json, free it
|
||||
free(ev->json);
|
||||
return 1;
|
||||
goto success;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -2507,20 +2740,26 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
|
||||
}
|
||||
|
||||
if (!ndb_ingester_process_note(ctx, note, note_size,
|
||||
out, ingester)) {
|
||||
out, ingester,
|
||||
ev->relay)) {
|
||||
ndb_debug("failed to process note\n");
|
||||
goto cleanup;
|
||||
} else {
|
||||
// we're done with the original json, free it
|
||||
free(ev->json);
|
||||
return 1;
|
||||
goto success;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
success:
|
||||
free(ev->json);
|
||||
// we don't free relay or buf since those are passed to the writer thread
|
||||
return 1;
|
||||
|
||||
cleanup:
|
||||
free(ev->json);
|
||||
if (ev->relay)
|
||||
free((void*)ev->relay);
|
||||
free(buf);
|
||||
|
||||
return ok;
|
||||
@@ -2628,6 +2867,68 @@ retry:
|
||||
return 1;
|
||||
}
|
||||
|
||||
//
|
||||
// The relay kind index has a layout like so (so we don't need dupsort)
|
||||
//
|
||||
// - note_id: 00 + 8 bytes
|
||||
// - kind: 08 + 8 bytes
|
||||
// - created_at: 16 + 8 bytes
|
||||
// - relay_url_size: 24 + 1 byte
|
||||
// - relay_url: 25 + n byte null-terminated string
|
||||
// - pad to 8 byte alignment
|
||||
//
|
||||
// The key sort order is:
|
||||
//
|
||||
// relay_url, kind, created_at
|
||||
//
|
||||
static int ndb_relay_kind_cmp(const MDB_val *a, const MDB_val *b)
|
||||
{
|
||||
int cmp;
|
||||
MDB_val va, vb;
|
||||
uint64_t iva, ivb;
|
||||
unsigned char *ad = (unsigned char *)a->mv_data;
|
||||
unsigned char *bd = (unsigned char *)b->mv_data;
|
||||
assert(((uint64_t)a->mv_data % 8) == 0);
|
||||
|
||||
va.mv_size = *(ad + 24);
|
||||
va.mv_data = ad + 25;
|
||||
|
||||
vb.mv_size = *(bd + 24);
|
||||
vb.mv_data = bd + 25;
|
||||
|
||||
cmp = mdb_cmp_memn(&va, &vb);
|
||||
if (cmp) return cmp;
|
||||
|
||||
// kind
|
||||
iva = *(uint64_t*)(ad + 8);
|
||||
ivb = *(uint64_t*)(bd + 8);
|
||||
|
||||
if (iva < ivb)
|
||||
return -1;
|
||||
else if (iva > ivb)
|
||||
return 1;
|
||||
|
||||
// created_at
|
||||
iva = *(uint64_t*)(ad + 16);
|
||||
ivb = *(uint64_t*)(bd + 16);
|
||||
|
||||
if (iva < ivb)
|
||||
return -1;
|
||||
else if (iva > ivb)
|
||||
return 1;
|
||||
|
||||
// note_id (so we don't need dupsort logic)
|
||||
iva = *(uint64_t*)ad;
|
||||
ivb = *(uint64_t*)bd;
|
||||
|
||||
if (iva < ivb)
|
||||
return -1;
|
||||
else if (iva > ivb)
|
||||
return 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b)
|
||||
{
|
||||
int cmp;
|
||||
@@ -4356,7 +4657,7 @@ static uint64_t ndb_write_note(struct ndb_txn *txn,
|
||||
unsigned char *scratch, size_t scratch_size,
|
||||
uint32_t ndb_flags)
|
||||
{
|
||||
int rc;
|
||||
int rc, relay_len = 0;
|
||||
uint64_t note_key, kind;
|
||||
MDB_dbi note_db;
|
||||
MDB_val key, val;
|
||||
@@ -4384,11 +4685,18 @@ static uint64_t ndb_write_note(struct ndb_txn *txn,
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (note->relay != NULL)
|
||||
relay_len = strlen(note->relay);
|
||||
|
||||
ndb_write_note_id_index(txn, note->note, note_key);
|
||||
ndb_write_note_kind_index(txn, note->note, note_key);
|
||||
ndb_write_note_tag_index(txn, note->note, note_key);
|
||||
ndb_write_note_pubkey_index(txn, note->note, note_key);
|
||||
ndb_write_note_pubkey_kind_index(txn, note->note, note_key);
|
||||
ndb_write_note_relay_kind_index(txn, kind, note_key,
|
||||
ndb_note_created_at(note->note),
|
||||
note->relay, relay_len);
|
||||
ndb_write_note_relay(txn, note_key, note->relay, relay_len);
|
||||
|
||||
// only parse content and do fulltext index on text and longform notes
|
||||
if (kind == 1 || kind == 30023) {
|
||||
@@ -4562,7 +4870,7 @@ static void *ndb_writer_thread(void *data)
|
||||
struct ndb_writer *writer = data;
|
||||
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
|
||||
struct written_note written_notes[THREAD_QUEUE_BATCH];
|
||||
int i, popped, done, needs_commit, num_notes;
|
||||
int i, popped, done, relay_len, needs_commit, num_notes;
|
||||
uint64_t note_nkey;
|
||||
struct ndb_txn txn;
|
||||
unsigned char *scratch;
|
||||
@@ -4590,6 +4898,7 @@ static void *ndb_writer_thread(void *data)
|
||||
case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break;
|
||||
case NDB_WRITER_BLOCKS: needs_commit = 1; break;
|
||||
case NDB_WRITER_MIGRATE: needs_commit = 1; break;
|
||||
case NDB_WRITER_NOTE_RELAY: needs_commit = 1; break;
|
||||
case NDB_WRITER_QUIT: break;
|
||||
}
|
||||
}
|
||||
@@ -4643,6 +4952,20 @@ static void *ndb_writer_thread(void *data)
|
||||
};
|
||||
}
|
||||
break;
|
||||
case NDB_WRITER_NOTE_RELAY:
|
||||
relay_len = strlen(msg->note_relay.relay);
|
||||
ndb_write_note_relay(&txn,
|
||||
msg->note_relay.note_key,
|
||||
msg->note_relay.relay,
|
||||
relay_len);
|
||||
ndb_write_note_relay_kind_index(
|
||||
&txn,
|
||||
msg->note_relay.kind,
|
||||
msg->note_relay.note_key,
|
||||
msg->note_relay.created_at,
|
||||
msg->note_relay.relay,
|
||||
relay_len);
|
||||
break;
|
||||
case NDB_WRITER_DBMETA:
|
||||
ndb_write_version(&txn, msg->ndb_meta.version);
|
||||
break;
|
||||
@@ -4683,11 +5006,15 @@ static void *ndb_writer_thread(void *data)
|
||||
msg = &msgs[i];
|
||||
if (msg->type == NDB_WRITER_NOTE) {
|
||||
free(msg->note.note);
|
||||
if (msg->note.relay)
|
||||
free((void*)msg->note.relay);
|
||||
} else if (msg->type == NDB_WRITER_PROFILE) {
|
||||
free(msg->profile.note.note);
|
||||
//ndb_profile_record_builder_free(&msg->profile.record);
|
||||
} else if (msg->type == NDB_WRITER_BLOCKS) {
|
||||
} else if (msg->type == NDB_WRITER_BLOCKS) {
|
||||
ndb_blocks_free(msg->blocks.blocks);
|
||||
} else if (msg->type == NDB_WRITER_NOTE_RELAY) {
|
||||
free((void*)msg->note_relay.relay);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4928,6 +5255,20 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map
|
||||
return 0;
|
||||
}
|
||||
|
||||
// relay kind index. maps <relay_url><kind><created><note_id> primary keys to relay records
|
||||
// see ndb_relay_kind_cmp function for more details on the key format
|
||||
if ((rc = mdb_dbi_open(txn, "relay_kind", MDB_CREATE, &lmdb->dbs[NDB_DB_NOTE_RELAY_KIND]))) {
|
||||
fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc);
|
||||
return 0;
|
||||
}
|
||||
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_RELAY_KIND], ndb_relay_kind_cmp);
|
||||
|
||||
// note_id -> relay index
|
||||
if ((rc = mdb_dbi_open(txn, "note_relays", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, &lmdb->dbs[NDB_DB_NOTE_RELAYS]))) {
|
||||
fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// id+ts index flags
|
||||
unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED;
|
||||
|
||||
@@ -5109,6 +5450,7 @@ void ndb_destroy(struct ndb *ndb)
|
||||
free(ndb);
|
||||
}
|
||||
|
||||
|
||||
// Process a nostr event from a client
|
||||
//
|
||||
// ie: ["EVENT", {"content":"..."} ...]
|
||||
@@ -5116,7 +5458,10 @@ void ndb_destroy(struct ndb *ndb)
|
||||
// The client-sent variation of ndb_process_event
|
||||
int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
|
||||
{
|
||||
return ndb_ingest_event(&ndb->ingester, json, len, 1);
|
||||
struct ndb_ingest_meta meta;
|
||||
ndb_ingest_meta_init(&meta, 1, NULL);
|
||||
|
||||
return ndb_ingest_event(&ndb->ingester, json, len, &meta);
|
||||
}
|
||||
|
||||
// Process anostr event from a relay,
|
||||
@@ -5138,25 +5483,32 @@ int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
|
||||
//
|
||||
int ndb_process_event(struct ndb *ndb, const char *json, int json_len)
|
||||
{
|
||||
return ndb_ingest_event(&ndb->ingester, json, json_len, 0);
|
||||
struct ndb_ingest_meta meta;
|
||||
ndb_ingest_meta_init(&meta, 0, NULL);
|
||||
|
||||
return ndb_ingest_event(&ndb->ingester, json, json_len, &meta);
|
||||
}
|
||||
|
||||
int ndb_process_event_with(struct ndb *ndb, const char *json, int json_len,
|
||||
struct ndb_ingest_meta *meta)
|
||||
{
|
||||
return ndb_ingest_event(&ndb->ingester, json, json_len, meta);
|
||||
}
|
||||
|
||||
int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client)
|
||||
int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len,
|
||||
struct ndb_ingest_meta *meta)
|
||||
{
|
||||
const char *start, *end, *very_end;
|
||||
start = ldjson;
|
||||
end = start + json_len;
|
||||
very_end = ldjson + json_len;
|
||||
int (* process)(struct ndb *, const char *, int);
|
||||
#if DEBUG
|
||||
int processed = 0;
|
||||
#endif
|
||||
process = client ? ndb_process_client_event : ndb_process_event;
|
||||
|
||||
while ((end = fast_strchr(start, '\n', very_end - start))) {
|
||||
//printf("processing '%.*s'\n", (int)(end-start), start);
|
||||
if (!process(ndb, start, end - start)) {
|
||||
if (!ndb_process_event_with(ndb, start, end - start, meta)) {
|
||||
ndb_debug("ndb_process_client_event failed\n");
|
||||
return 0;
|
||||
}
|
||||
@@ -5194,14 +5546,26 @@ int ndb_process_events_stream(struct ndb *ndb, FILE* fp)
|
||||
}
|
||||
#endif
|
||||
|
||||
int ndb_process_events_with(struct ndb *ndb, const char *ldjson, size_t json_len,
|
||||
struct ndb_ingest_meta *meta)
|
||||
{
|
||||
return _ndb_process_events(ndb, ldjson, json_len, meta);
|
||||
}
|
||||
|
||||
int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len)
|
||||
{
|
||||
return _ndb_process_events(ndb, ldjson, json_len, 1);
|
||||
struct ndb_ingest_meta meta;
|
||||
ndb_ingest_meta_init(&meta, 1, NULL);
|
||||
|
||||
return _ndb_process_events(ndb, ldjson, json_len, &meta);
|
||||
}
|
||||
|
||||
int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len)
|
||||
{
|
||||
return _ndb_process_events(ndb, ldjson, json_len, 0);
|
||||
struct ndb_ingest_meta meta;
|
||||
ndb_ingest_meta_init(&meta, 0, NULL);
|
||||
|
||||
return _ndb_process_events(ndb, ldjson, json_len, &meta);
|
||||
}
|
||||
|
||||
static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag)
|
||||
@@ -7086,6 +7450,10 @@ const char *ndb_db_name(enum ndb_dbs db)
|
||||
return "note_pubkey_index";
|
||||
case NDB_DB_NOTE_PUBKEY_KIND:
|
||||
return "note_pubkey_kind_index";
|
||||
case NDB_DB_NOTE_RELAY_KIND:
|
||||
return "note_relay_kind_index";
|
||||
case NDB_DB_NOTE_RELAYS:
|
||||
return "note_relays";
|
||||
case NDB_DBS:
|
||||
return "count";
|
||||
}
|
||||
|
||||
@@ -55,6 +55,11 @@ struct ndb_str {
|
||||
};
|
||||
};
|
||||
|
||||
struct ndb_ingest_meta {
|
||||
unsigned client;
|
||||
const char *relay;
|
||||
};
|
||||
|
||||
struct ndb_keypair {
|
||||
unsigned char pubkey[32];
|
||||
unsigned char secret[32];
|
||||
@@ -189,6 +194,8 @@ enum ndb_dbs {
|
||||
NDB_DB_NOTE_TAGS, // note tags index
|
||||
NDB_DB_NOTE_PUBKEY, // note pubkey index
|
||||
NDB_DB_NOTE_PUBKEY_KIND, // note pubkey kind index
|
||||
NDB_DB_NOTE_RELAY_KIND, // relay+kind+created -> note_id
|
||||
NDB_DB_NOTE_RELAYS, // note_id -> relays
|
||||
NDB_DBS,
|
||||
};
|
||||
|
||||
@@ -475,14 +482,23 @@ int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[3
|
||||
// NDB
|
||||
int ndb_init(struct ndb **ndb, const char *dbdir, const struct ndb_config *);
|
||||
int ndb_db_version(struct ndb_txn *txn);
|
||||
|
||||
// NOTE PROCESSING
|
||||
int ndb_process_event(struct ndb *, const char *json, int len);
|
||||
void ndb_ingest_meta_init(struct ndb_ingest_meta *meta, unsigned client, const char *relay);
|
||||
// Process an event, recording the relay where it came from.
|
||||
int ndb_process_event_with(struct ndb *, const char *json, int len, struct ndb_ingest_meta *meta);
|
||||
int ndb_process_events(struct ndb *, const char *ldjson, size_t len);
|
||||
int ndb_process_events_with(struct ndb *ndb, const char *ldjson, size_t json_len, struct ndb_ingest_meta *meta);
|
||||
#ifndef _WIN32
|
||||
// TODO: fix on windows
|
||||
int ndb_process_events_stream(struct ndb *, FILE* fp);
|
||||
#endif
|
||||
// deprecated: use ndb_ingest_event_with
|
||||
int ndb_process_client_event(struct ndb *, const char *json, int len);
|
||||
// deprecated: use ndb_ingest_events_with
|
||||
int ndb_process_client_events(struct ndb *, const char *json, size_t len);
|
||||
|
||||
int ndb_begin_query(struct ndb *, struct ndb_txn *);
|
||||
int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query);
|
||||
int ndb_search_profile_next(struct ndb_search *search);
|
||||
@@ -497,6 +513,7 @@ uint64_t ndb_get_profilekey_by_pubkey(struct ndb_txn *txn, const unsigned char *
|
||||
struct ndb_note *ndb_get_note_by_id(struct ndb_txn *txn, const unsigned char *id, size_t *len, uint64_t *primkey);
|
||||
struct ndb_note *ndb_get_note_by_key(struct ndb_txn *txn, uint64_t key, size_t *len);
|
||||
void *ndb_get_note_meta(struct ndb_txn *txn, const unsigned char *id, size_t *len);
|
||||
int ndb_note_seen_on_relay(struct ndb_txn *txn, uint64_t note_key, const char *relay);
|
||||
void ndb_destroy(struct ndb *);
|
||||
|
||||
// BUILDER
|
||||
|
||||
Reference in New Issue
Block a user