nostrdb: Relay queries

Add support for relay-based filtering in nostr queries.

Filters can now include a "relays" field. Optimal performance when
you include a kind as well:

{"relays":["wss://pyramid.fiatjaf.com/"], "kinds":[1]}

This corresponds to a `ndb` query like so:

$ ndb query -r wss://pyramid.fiatjaf.com/ -k 1 -l 1
using filter '{"relays":["wss://pyramid.fiatjaf.com/"],"kinds":[1],"limit":1}'
1 results in 0.094929 ms
{"id":"277dd4ed26d0b44576..}

Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2025-03-21 21:02:14 -07:00
committed by Daniel D’Aquino
parent af2298dcb7
commit 0f66e87faf
2 changed files with 388 additions and 79 deletions

View File

@@ -238,6 +238,7 @@ enum ndb_query_plan {
NDB_PLAN_CREATED,
NDB_PLAN_TAGS,
NDB_PLAN_SEARCH,
NDB_PLAN_RELAY_KINDS,
};
// A id + u64 + timestamp
@@ -751,6 +752,7 @@ static const char *ndb_filter_field_name(enum ndb_filter_fieldtype field)
case NDB_FILTER_UNTIL: return "until";
case NDB_FILTER_LIMIT: return "limit";
case NDB_FILTER_SEARCH: return "search";
case NDB_FILTER_RELAYS: return "relays";
}
return "unknown";
@@ -862,6 +864,15 @@ static int ndb_filter_add_element(struct ndb_filter *filter, union ndb_filter_el
}
// push a pointer of the string in the databuf as an element
break;
case NDB_FILTER_RELAYS:
if (current->field.elem_type != NDB_ELEMENT_STRING) {
return 0;
}
if (!cursor_push(&filter->data_buf, (unsigned char *)el.string.string, el.string.len))
return 0;
if (!cursor_push_byte(&filter->data_buf, 0))
return 0;
break;
}
if (!cursor_push(&filter->elem_buf, (unsigned char *)&offset,
@@ -920,6 +931,7 @@ int ndb_filter_add_str_element_len(struct ndb_filter *filter, const char *str, i
return 0;
}
break;
case NDB_FILTER_RELAYS:
case NDB_FILTER_TAGS:
break;
}
@@ -950,6 +962,7 @@ int ndb_filter_add_int_element(struct ndb_filter *filter, uint64_t integer)
case NDB_FILTER_AUTHORS:
case NDB_FILTER_TAGS:
case NDB_FILTER_SEARCH:
case NDB_FILTER_RELAYS:
return 0;
case NDB_FILTER_KINDS:
case NDB_FILTER_SINCE:
@@ -981,6 +994,7 @@ int ndb_filter_add_id_element(struct ndb_filter *filter, const unsigned char *id
case NDB_FILTER_LIMIT:
case NDB_FILTER_KINDS:
case NDB_FILTER_SEARCH:
case NDB_FILTER_RELAYS:
return 0;
case NDB_FILTER_IDS:
case NDB_FILTER_AUTHORS:
@@ -1086,6 +1100,31 @@ static int compare_ids(const void *pa, const void *pb)
return memcmp(a, b, 32);
}
static int compare_strs(const void *pa, const void *pb)
{
const char *a = *(const char **)pa;
const char *b = *(const char **)pb;
return strcmp(a, b);
}
static int search_strs(const void *ctx, const void *mstr_ptr)
{
// we reuse search_id_state here and just cast to (const char *) when
// needed
struct search_id_state *state;
const char *mstr_str;
uint32_t mstr;
state = (struct search_id_state *)ctx;
mstr = *(uint32_t *)mstr_ptr;
mstr_str = (const char *)ndb_filter_elements_data(state->filter, mstr);
assert(mstr_str);
return strcmp((const char *)state->key, mstr_str);
}
static int search_ids(const void *ctx, const void *mid_ptr)
{
struct search_id_state *state;
@@ -1120,7 +1159,8 @@ static int compare_kinds(const void *pa, const void *pb)
//
// returns 1 if a filter matches a note
static int ndb_filter_matches_with(struct ndb_filter *filter,
struct ndb_note *note, int already_matched)
struct ndb_note *note, int already_matched,
struct ndb_note_relay_iterator *relay_iter)
{
int i, j;
struct ndb_filter_elements *els;
@@ -1139,12 +1179,28 @@ static int ndb_filter_matches_with(struct ndb_filter *filter,
continue;
switch (els->field.type) {
case NDB_FILTER_KINDS:
for (j = 0; j < els->count; j++) {
if ((unsigned int)els->elements[j] == note->kind)
goto cont;
case NDB_FILTER_KINDS:
for (j = 0; j < els->count; j++) {
if ((unsigned int)els->elements[j] == note->kind)
goto cont;
}
break;
case NDB_FILTER_RELAYS:
// for each relay the note was seen on, see if any match
if (!relay_iter) {
assert(!"expected relay iterator...");
break;
}
while ((state.key = (unsigned char *)ndb_note_relay_iterate_next(relay_iter))) {
// relays in filters are always sorted
if (bsearch(&state, &els->elements[0], els->count,
sizeof(els->elements[0]), search_strs)) {
ndb_note_relay_iterate_close(relay_iter);
goto cont;
}
}
ndb_note_relay_iterate_close(relay_iter);
break;
case NDB_FILTER_IDS:
state.key = ndb_note_id(note);
if (bsearch(&state, &els->elements[0], els->count,
@@ -1201,7 +1257,14 @@ cont:
int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note)
{
return ndb_filter_matches_with(filter, note, 0);
return ndb_filter_matches_with(filter, note, 0, NULL);
}
int ndb_filter_matches_with_relay(struct ndb_filter *filter,
struct ndb_note *note,
struct ndb_note_relay_iterator *note_relay_iter)
{
return ndb_filter_matches_with(filter, note, 0, note_relay_iter);
}
// because elements are stored as offsets and qsort doesn't support context,
@@ -1298,6 +1361,9 @@ void ndb_filter_end_field(struct ndb_filter *filter)
case NDB_FILTER_AUTHORS:
sort_filter_elements(filter, cur, compare_ids);
break;
case NDB_FILTER_RELAYS:
sort_filter_elements(filter, cur, compare_strs);
break;
case NDB_FILTER_KINDS:
qsort(&cur->elements[0], cur->count,
sizeof(cur->elements[0]), compare_kinds);
@@ -1535,7 +1601,7 @@ static int prepare_relay_buf(char *relay_buf, int bufsize, const char *relay,
// Write to the note_id -> relay_url database. This records where notes
// have been seen
static int ndb_write_note_relay(struct ndb_txn *txn, uint64_t note_key,
const char *relay, int relay_len)
const char *relay, uint8_t relay_len)
{
char relay_buf[256];
int rc, len;
@@ -1575,12 +1641,90 @@ static int ndb_write_note_relay(struct ndb_txn *txn, uint64_t note_key,
return 1;
}
static int ndb_write_note_relay_kind_index(struct ndb_txn *txn,
uint64_t kind,
uint64_t note_key,
uint64_t created_at,
const char *relay,
int relay_len)
struct ndb_relay_kind_key {
uint64_t note_key;
uint64_t kind;
uint64_t created_at;
uint8_t relay_len;
const char *relay;
};
static int ndb_relay_kind_key_init(
struct ndb_relay_kind_key *key,
uint64_t note_key,
uint64_t kind,
uint64_t created_at,
const char *relay)
{
if (relay == NULL)
return 0;
key->relay = relay;
key->relay_len = strlen(relay);
if (key->relay_len > 248)
return 0;
key->note_key = note_key;
key->kind = kind;
key->created_at = created_at;
return 1;
}
// create a range key for a relay kind query
static int ndb_relay_kind_key_init_high(
struct ndb_relay_kind_key *key,
const char *relay,
uint64_t kind,
uint64_t until)
{
return ndb_relay_kind_key_init(key, UINT64_MAX, kind, UINT64_MAX, relay);
}
static void ndb_parse_relay_kind_key(struct ndb_relay_kind_key *key, unsigned char *buf)
{
// WE ARE ASSUMING WE ARE PARSING FROM AN ALIGNED BUFFER HERE
assert((uint64_t)buf % 8 == 0);
// - note_id: 00 + 8 bytes
// - kind: 08 + 8 bytes
// - created_at: 16 + 8 bytes
// - relay_url_size: 24 + 1 byte
// - relay_url: 25 + n byte null-terminated string
// - pad to 8 byte alignment
key->note_key = *(uint64_t*) (buf + 0);
key->kind = *(uint64_t*) (buf + 8);
key->created_at = *(uint64_t*) (buf + 16);
key->relay_len = *(uint8_t*) (buf + 24);
key->relay = (const char*) (buf + 25);
}
static void ndb_debug_relay_kind_key(struct ndb_relay_kind_key *key)
{
ndb_debug("note_key:%" PRIu64 " kind:%" PRIu64 " created_at:%" PRIu64 " '%s'\n",
key->note_key, key->kind, key->created_at, key->relay);
}
static int ndb_build_relay_kind_key(unsigned char *buf, int bufsize, struct ndb_relay_kind_key *key)
{
struct cursor cur;
make_cursor(buf, buf + bufsize, &cur);
if (!cursor_push(&cur, (unsigned char *)&key->note_key, 8)) return 0;
if (!cursor_push(&cur, (unsigned char *)&key->kind, 8)) return 0;
if (!cursor_push(&cur, (unsigned char *)&key->created_at, 8)) return 0;
if (!cursor_push_byte(&cur, key->relay_len)) return 0;
if (!cursor_push(&cur, (unsigned char *)key->relay, key->relay_len)) return 0;
if (!cursor_push_byte(&cur, 0)) return 0;
if (!cursor_align(&cur, 8)) return 0;
assert(((cur.p-cur.start)%8) == 0);
return cur.p - cur.start;
}
static int ndb_write_note_relay_kind_index(
struct ndb_txn *txn,
struct ndb_relay_kind_key *key)
{
// The relay kind key has a layout like so
//
@@ -1592,33 +1736,20 @@ static int ndb_write_note_relay_kind_index(struct ndb_txn *txn,
// - pad to 8 byte alignment
unsigned char buf[256];
int rc;
struct cursor cur;
int rc, len;
MDB_val k, v;
// come on bro
if (relay_len > 248)
if (key->relay_len > 248 || key->relay == NULL || key->relay_len == 0)
return 0;
if (relay == NULL || relay_len == 0)
ndb_debug("writing note_relay_kind_index '%s' for notekey:%" PRIu64 "\n", key->relay, key->note_key);
if (!(len = ndb_build_relay_kind_key(buf, sizeof(buf), key)))
return 0;
ndb_debug("writing note_relay_kind_index '%s' for notekey:%" PRIu64 "\n", relay, note_key);
make_cursor(buf, buf + sizeof(buf), &cur);
if (!cursor_push(&cur, (unsigned char *)&note_key, 8)) return 0;
if (!cursor_push(&cur, (unsigned char *)&kind, 8)) return 0;
if (!cursor_push(&cur, (unsigned char *)&created_at, 8)) return 0;
if (!cursor_push_byte(&cur, (uint8_t)relay_len)) return 0;
if (!cursor_push(&cur, (unsigned char *)relay, relay_len)) return 0;
if (!cursor_push_byte(&cur, 0)) return 0;
if (!cursor_align(&cur, 8)) return 0;
assert(((cur.p-cur.start)%8) == 0);
k.mv_data = cur.start;
k.mv_size = cur.p - cur.start;
k.mv_data = buf;
k.mv_size = len;
v.mv_data = NULL;
v.mv_size = 0;
@@ -1632,6 +1763,14 @@ static int ndb_write_note_relay_kind_index(struct ndb_txn *txn,
return 1;
}
// writes the relay note kind index and the note_id -> relay db
static int ndb_write_note_relay_indexes(struct ndb_txn *txn, struct ndb_relay_kind_key *key)
{
ndb_write_note_relay_kind_index(txn, key);
ndb_write_note_relay(txn, key->note_key, key->relay, key->relay_len);
return 1;
}
static int ndb_write_note_pubkey_index(struct ndb_txn *txn, struct ndb_note *note,
uint64_t note_key)
{
@@ -2212,17 +2351,23 @@ int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey,
// after the first element, so we have to go back one.
static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v)
{
int rc;
// Position cursor at the next key greater than or equal to the
// specified key
if (mdb_cursor_get(cur, k, v, MDB_SET_RANGE)) {
if ((rc = mdb_cursor_get(cur, k, v, MDB_SET_RANGE))) {
// Failed :(. It could be the last element?
if (mdb_cursor_get(cur, k, v, MDB_LAST))
if ((rc = mdb_cursor_get(cur, k, v, MDB_LAST))) {
ndb_debug("MDB_LAST failed: '%s'\n", mdb_strerror(rc));
return 0;
}
} else {
// if set range worked and our key exists, it should be
// the one right before this one
if (mdb_cursor_get(cur, k, v, MDB_PREV))
if ((rc = mdb_cursor_get(cur, k, v, MDB_PREV))) {
ndb_debug("moving back failed: '%s'\n", mdb_strerror(rc));
return 0;
}
}
return 1;
@@ -3415,7 +3560,7 @@ static int ndb_query_plan_execute_ids(struct ndb_txn *txn,
MDB_cursor *cur;
MDB_dbi db;
MDB_val k, v;
int rc, i;
int rc, i, need_relays = 0;
struct ndb_filter_elements *ids;
struct ndb_note *note;
struct ndb_query_result res;
@@ -3423,6 +3568,7 @@ static int ndb_query_plan_execute_ids(struct ndb_txn *txn,
uint64_t note_id, until, *pint;
size_t note_size;
unsigned char *id;
struct ndb_note_relay_iterator note_relay_iter;
until = UINT64_MAX;
@@ -3432,6 +3578,9 @@ static int ndb_query_plan_execute_ids(struct ndb_txn *txn,
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
until = *pint;
if (ndb_filter_find_elements(filter, NDB_FILTER_RELAYS))
need_relays = 1;
db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
return 0;
@@ -3460,12 +3609,16 @@ static int ndb_query_plan_execute_ids(struct ndb_txn *txn,
if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
continue;
if (need_relays)
ndb_note_relay_iterate_start(txn, &note_relay_iter, note_id);
// Sure this particular lookup matched the index query, but
// does it match the entire filter? Check! We also pass in
// things we've already matched via the filter so we don't have
// to check again. This can be pretty important for filters
// with a large number of entries.
if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_IDS))
if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_IDS,
need_relays ? &note_relay_iter : NULL))
continue;
ndb_query_result_init(&res, note, note_size, note_id);
@@ -3521,7 +3674,7 @@ static int ndb_query_plan_execute_authors(struct ndb_txn *txn,
{
MDB_val k, v;
MDB_cursor *cur;
int rc, i;
int rc, i, need_relays = 0;
uint64_t *pint, until, since, note_key;
unsigned char *author;
struct ndb_note *note;
@@ -3529,6 +3682,7 @@ static int ndb_query_plan_execute_authors(struct ndb_txn *txn,
struct ndb_filter_elements *authors;
struct ndb_query_result res;
struct ndb_tsid tsid, *ptsid;
struct ndb_note_relay_iterator note_relay_iter;
enum ndb_dbs db;
db = txn->lmdb->dbs[NDB_DB_NOTE_PUBKEY];
@@ -3544,6 +3698,9 @@ static int ndb_query_plan_execute_authors(struct ndb_txn *txn,
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE)))
since = *pint;
if (ndb_filter_find_elements(filter, NDB_FILTER_RELAYS))
need_relays = 1;
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
return 0;
@@ -3576,8 +3733,15 @@ static int ndb_query_plan_execute_authors(struct ndb_txn *txn,
if (!(note = ndb_get_note_by_key(txn, note_key, &note_size)))
goto next;
if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_AUTHORS))
if (need_relays)
ndb_note_relay_iterate_start(txn, &note_relay_iter, note_key);
if (!ndb_filter_matches_with(filter, note,
1 << NDB_FILTER_AUTHORS,
need_relays ? &note_relay_iter : NULL))
{
goto next;
}
ndb_query_result_init(&res, note, note_size, note_key);
if (!push_query_result(results, &res))
@@ -3601,12 +3765,13 @@ static int ndb_query_plan_execute_created_at(struct ndb_txn *txn,
MDB_dbi db;
MDB_val k, v;
MDB_cursor *cur;
int rc;
int rc, need_relays = 0;
struct ndb_note *note;
struct ndb_tsid key, *pkey;
uint64_t *pint, until, since, note_id;
size_t note_size;
struct ndb_query_result res;
struct ndb_note_relay_iterator note_relay_iter;
unsigned char high_key[32] = {0xFF};
db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
@@ -3619,6 +3784,9 @@ static int ndb_query_plan_execute_created_at(struct ndb_txn *txn,
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE)))
since = *pint;
if (ndb_filter_find_elements(filter, NDB_FILTER_RELAYS))
need_relays = 1;
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
return 0;
@@ -3642,8 +3810,11 @@ static int ndb_query_plan_execute_created_at(struct ndb_txn *txn,
if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
goto next;
if (need_relays)
ndb_note_relay_iterate_start(txn, &note_relay_iter, note_id);
// does this entry match our filter?
if (!ndb_filter_matches_with(filter, note, 0))
if (!ndb_filter_matches_with(filter, note, 0, need_relays ? &note_relay_iter : NULL))
goto next;
ndb_query_result_init(&res, note, (uint64_t)note_size, note_id);
@@ -3666,7 +3837,7 @@ static int ndb_query_plan_execute_tags(struct ndb_txn *txn,
MDB_cursor *cur;
MDB_dbi db;
MDB_val k, v;
int len, taglen, rc, i;
int len, taglen, rc, i, need_relays = 0;
uint64_t *pint, until, note_id;
size_t note_size;
unsigned char key_buffer[255];
@@ -3674,12 +3845,16 @@ static int ndb_query_plan_execute_tags(struct ndb_txn *txn,
struct ndb_filter_elements *tags;
unsigned char *tag;
struct ndb_query_result res;
struct ndb_note_relay_iterator note_relay_iter;
db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS];
if (!(tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS)))
return 0;
if (ndb_filter_find_elements(filter, NDB_FILTER_RELAYS))
need_relays = 1;
until = UINT64_MAX;
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
until = *pint;
@@ -3722,7 +3897,12 @@ static int ndb_query_plan_execute_tags(struct ndb_txn *txn,
if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
goto next;
if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS))
if (need_relays)
ndb_note_relay_iterate_start(txn, &note_relay_iter, note_id);
if (!ndb_filter_matches_with(filter, note,
1 << NDB_FILTER_TAGS,
need_relays ? &note_relay_iter : NULL))
goto next;
ndb_query_result_init(&res, note, note_size, note_id);
@@ -3739,6 +3919,120 @@ next:
return 1;
}
static int ndb_query_plan_execute_relay_kinds(
struct ndb_txn *txn,
struct ndb_filter *filter,
struct ndb_query_results *results,
int limit)
{
MDB_cursor *cur;
MDB_dbi db;
MDB_val k, v;
struct ndb_note *note;
struct ndb_filter_elements *kinds, *relays;
struct ndb_query_result res;
uint64_t kind, note_id, until, since, *pint;
size_t note_size;
const char *relay;
int i, j, rc, len;
struct ndb_relay_kind_key relay_key;
unsigned char keybuf[256];
// we should have kinds in a kinds filter!
if (!(kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS)))
return 0;
if (!(relays = ndb_filter_find_elements(filter, NDB_FILTER_RELAYS)))
return 0;
until = UINT64_MAX;
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
until = *pint;
since = 0;
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE)))
since = *pint;
db = txn->lmdb->dbs[NDB_DB_NOTE_RELAY_KIND];
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
return 0;
for (j = 0; j < relays->count; j++) {
if (query_is_full(results, limit))
break;
if (!(relay = ndb_filter_get_string_element(filter, relays, j)))
continue;
for (i = 0; i < kinds->count; i++) {
if (query_is_full(results, limit))
break;
kind = kinds->elements[i];
ndb_debug("kind %" PRIu64 "\n", kind);
if (!ndb_relay_kind_key_init_high(&relay_key, relay, kind, until)) {
ndb_debug("ndb_relay_kind_key_init_high failed in relay query\n");
continue;
}
if (!(len = ndb_build_relay_kind_key(keybuf, sizeof(keybuf), &relay_key))) {
ndb_debug("ndb_build_relay_kind_key failed in relay query\n");
ndb_debug_relay_kind_key(&relay_key);
continue;
}
k.mv_data = keybuf;
k.mv_size = len;
ndb_debug("starting with key ");
ndb_debug_relay_kind_key(&relay_key);
if (!ndb_cursor_start(cur, &k, &v))
continue;
// scan the kind subindex
while (!query_is_full(results, limit)) {
ndb_parse_relay_kind_key(&relay_key, k.mv_data);
ndb_debug("inside kind subindex ");
ndb_debug_relay_kind_key(&relay_key);
if (relay_key.kind != kind)
break;
if (strcmp(relay_key.relay, relay))
break;
// don't continue the scan if we're below `since`
if (relay_key.created_at < since)
break;
note_id = relay_key.note_key;
if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
goto next;
if (!ndb_filter_matches_with(filter, note,
(1 << NDB_FILTER_KINDS) | (1 << NDB_FILTER_RELAYS),
NULL))
goto next;
ndb_query_result_init(&res, note, note_size, note_id);
if (!push_query_result(results, &res))
break;
next:
if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
break;
}
}
}
mdb_cursor_close(cur);
return 1;
}
static int ndb_query_plan_execute_kinds(struct ndb_txn *txn,
struct ndb_filter *filter,
struct ndb_query_results *results,
@@ -3753,12 +4047,16 @@ static int ndb_query_plan_execute_kinds(struct ndb_txn *txn,
struct ndb_query_result res;
uint64_t kind, note_id, until, since, *pint;
size_t note_size;
int i, rc;
int i, rc, need_relays = 0;
struct ndb_note_relay_iterator note_relay_iter;
// we should have kinds in a kinds filter!
if (!(kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS)))
return 0;
if (ndb_filter_find_elements(filter, NDB_FILTER_RELAYS))
need_relays = 1;
until = UINT64_MAX;
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
until = *pint;
@@ -3800,7 +4098,12 @@ static int ndb_query_plan_execute_kinds(struct ndb_txn *txn,
if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
goto next;
if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_KINDS))
if (need_relays)
ndb_note_relay_iterate_start(txn, &note_relay_iter, note_id);
if (!ndb_filter_matches_with(filter, note,
1 << NDB_FILTER_KINDS,
need_relays ? &note_relay_iter : NULL))
goto next;
ndb_query_result_init(&res, note, note_size, note_id);
@@ -3819,19 +4122,22 @@ next:
static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter)
{
struct ndb_filter_elements *ids, *kinds, *authors, *tags, *search;
struct ndb_filter_elements *ids, *kinds, *authors, *tags, *search, *relays;
ids = ndb_filter_find_elements(filter, NDB_FILTER_IDS);
search = ndb_filter_find_elements(filter, NDB_FILTER_SEARCH);
kinds = ndb_filter_find_elements(filter, NDB_FILTER_KINDS);
authors = ndb_filter_find_elements(filter, NDB_FILTER_AUTHORS);
tags = ndb_filter_find_elements(filter, NDB_FILTER_TAGS);
relays = ndb_filter_find_elements(filter, NDB_FILTER_RELAYS);
// this is rougly similar to the heuristic in strfry's dbscan
if (search) {
return NDB_PLAN_SEARCH;
} else if (ids) {
return NDB_PLAN_IDS;
} else if (relays && kinds && !authors) {
return NDB_PLAN_RELAY_KINDS;
} else if (kinds && authors && authors->count <= 10) {
return NDB_PLAN_AUTHOR_KINDS;
} else if (authors && authors->count <= 10) {
@@ -3845,7 +4151,7 @@ static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter)
return NDB_PLAN_CREATED;
}
static const char *ndb_query_plan_name(int plan_id)
static const char *ndb_query_plan_name(enum ndb_query_plan plan_id)
{
switch (plan_id) {
case NDB_PLAN_IDS: return "ids";
@@ -3854,6 +4160,8 @@ static const char *ndb_query_plan_name(int plan_id)
case NDB_PLAN_TAGS: return "tags";
case NDB_PLAN_CREATED: return "created";
case NDB_PLAN_AUTHORS: return "authors";
case NDB_PLAN_RELAY_KINDS: return "relay_kinds";
case NDB_PLAN_AUTHOR_KINDS: return "author_kinds";
}
return "unknown";
@@ -3884,18 +4192,19 @@ static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
if (!ndb_query_plan_execute_ids(txn, filter, &results, limit))
return 0;
break;
case NDB_PLAN_RELAY_KINDS:
if (!ndb_query_plan_execute_relay_kinds(txn, filter, &results, limit))
return 0;
break;
case NDB_PLAN_SEARCH:
if (!ndb_query_plan_execute_search(txn, filter, &results, limit))
return 0;
break;
// We have just kinds, just scan the kind index
case NDB_PLAN_KINDS:
if (!ndb_query_plan_execute_kinds(txn, filter, &results, limit))
return 0;
break;
case NDB_PLAN_TAGS:
if (!ndb_query_plan_execute_tags(txn, filter, &results, limit))
return 0;
@@ -4671,23 +4980,18 @@ static uint64_t ndb_write_note(struct ndb_txn *txn,
unsigned char *scratch, size_t scratch_size,
uint32_t ndb_flags)
{
int rc, relay_len = 0;
int rc;
uint64_t note_key, kind;
struct ndb_relay_kind_key relay_key;
MDB_dbi note_db;
MDB_val key, val;
kind = note->note->kind;
if (note->relay != NULL)
relay_len = strlen(note->relay);
// let's quickly sanity check if we already have this note
if ((note_key = ndb_get_notekey_by_id(txn, note->note->id))) {
// even if we do we still need to write relay index
ndb_write_note_relay_kind_index(txn, kind, note_key,
ndb_note_created_at(note->note),
note->relay, relay_len);
ndb_write_note_relay(txn, note_key, note->relay, relay_len);
if (ndb_relay_kind_key_init(&relay_key, note_key, kind, ndb_note_created_at(note->note), note->relay))
ndb_write_note_relay_indexes(txn, &relay_key);
return 0;
}
@@ -4713,10 +5017,9 @@ static uint64_t ndb_write_note(struct ndb_txn *txn,
ndb_write_note_tag_index(txn, note->note, note_key);
ndb_write_note_pubkey_index(txn, note->note, note_key);
ndb_write_note_pubkey_kind_index(txn, note->note, note_key);
ndb_write_note_relay_kind_index(txn, kind, note_key,
ndb_note_created_at(note->note),
note->relay, relay_len);
ndb_write_note_relay(txn, note_key, note->relay, relay_len);
if (ndb_relay_kind_key_init(&relay_key, note_key, kind, ndb_note_created_at(note->note), note->relay))
ndb_write_note_relay_indexes(txn, &relay_key);
// only parse content and do fulltext index on text and longform notes
if (kind == 1 || kind == 30023) {
@@ -4890,7 +5193,7 @@ static void *ndb_writer_thread(void *data)
struct ndb_writer *writer = data;
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
struct written_note written_notes[THREAD_QUEUE_BATCH];
int i, popped, done, relay_len, needs_commit, num_notes;
int i, popped, done, needs_commit, num_notes;
uint64_t note_nkey;
struct ndb_txn txn;
unsigned char *scratch;
@@ -4973,18 +5276,15 @@ static void *ndb_writer_thread(void *data)
}
break;
case NDB_WRITER_NOTE_RELAY:
relay_len = strlen(msg->note_relay.relay);
ndb_write_note_relay(&txn,
msg->note_relay.note_key,
msg->note_relay.relay,
relay_len);
ndb_write_note_relay_kind_index(
&txn,
msg->note_relay.kind,
msg->note_relay.note_key,
msg->note_relay.created_at,
msg->note_relay.relay,
relay_len);
struct ndb_relay_kind_key relay_key;
if (ndb_relay_kind_key_init(&relay_key,
msg->note_relay.note_key,
msg->note_relay.kind,
msg->note_relay.created_at,
msg->note_relay.relay))
{
ndb_write_note_relay_indexes(&txn, &relay_key);
}
break;
case NDB_WRITER_DBMETA:
ndb_write_version(&txn, msg->ndb_meta.version);
@@ -5058,7 +5358,7 @@ static void *ndb_ingester_thread(void *data)
int rc;
ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);
ndb_debug("started ingester thread\n");
//ndb_debug("started ingester thread\n");
done = 0;
while (!done) {
@@ -6040,6 +6340,12 @@ int ndb_filter_json(const struct ndb_filter *filter, char *buf, int buflen)
if (!cursor_push_int_str(c, ndb_filter_get_int_element(elems, 0)))
return 0;
break;
case NDB_FILTER_RELAYS:
if (!cursor_push_str(c, "\"relays\":"))
return 0;
if (!cursor_push_json_elem_array(c, filter, elems))
return 0;
break;
}
if (i != filter->num_elements-1) {
@@ -6868,6 +7174,7 @@ static int ndb_filter_parse_json(struct ndb_json_parser *parser,
return 0;
}
break;
case NDB_FILTER_RELAYS:
case NDB_FILTER_TAGS:
if (!ndb_filter_parse_json_elems(parser, filter)) {
ndb_debug("failed to parse filter tags\n");
@@ -7210,7 +7517,7 @@ int ndb_print_relay_kind_index(struct ndb_txn *txn)
printf("relay\tkind\tcreated_at\tnote_id\n");
while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
d = (unsigned char *)k.mv_data;
printf("'%s'\t", (const char *)(d + 25));
printf("%s\t", (const char *)(d + 25));
printf("%" PRIu64 "\t", *(uint64_t*)(d + 8));
printf("%" PRIu64 "\t", *(uint64_t*)(d + 16));
printf("%" PRIu64 "\n", *(uint64_t*)(d + 0));