nostrdb/query: add tag index and tag queries

Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2024-01-08 16:18:30 -08:00
committed by Daniel D’Aquino
parent 05c5a6dacb
commit 8b3c86c5de
2 changed files with 265 additions and 18 deletions

View File

@@ -346,6 +346,32 @@ static int mdb_cmp_memn(const MDB_val *a, const MDB_val *b) {
return diff ? diff : len_diff<0 ? -1 : len_diff;
}
static int ndb_tag_key_compare(const MDB_val *a, const MDB_val *b)
{
MDB_val va, vb;
uint64_t ts_a, ts_b;
int cmp;
va.mv_data = a->mv_data;
va.mv_size = a->mv_size - 8;
vb.mv_data = b->mv_data;
vb.mv_size = b->mv_size - 8;
if ((cmp = mdb_cmp_memn(&va, &vb)))
return cmp;
ts_a = *(uint64_t*)(va.mv_data + va.mv_size);
ts_b = *(uint64_t*)(vb.mv_data + vb.mv_size);
if (ts_a < ts_b)
return -1;
else if (ts_a > ts_b)
return 1;
return 0;
}
static int ndb_text_search_key_compare(const MDB_val *a, const MDB_val *b)
{
struct cursor ca, cb;
@@ -777,6 +803,7 @@ static int ndb_tag_filter_matches(struct ndb_filter_elements *els,
// we should not expect an id
if (str.flag == NDB_PACKED_ID)
continue;
break;
case NDB_ELEMENT_UNKNOWN:
default:
@@ -2454,6 +2481,123 @@ static int ndb_query_plan_execute_ids(struct ndb_txn *txn,
return 1;
}
//
// encode a tag index key
//
// consists of:
//
// u8 tag
// u8 tag_val_len
// [u8] tag_val_bytes
// u64 created_at
//
static int ndb_encode_tag_key(unsigned char *buf, int buf_size,
char tag, const unsigned char *val,
unsigned char val_len,
uint64_t timestamp)
{
struct cursor writer;
int ok;
// quick exit for obvious case where it will be too big. There can be
// values of val_len that still fail, but we just let the writer handle
// those failure cases
if (val_len >= buf_size)
return 0;
make_cursor(buf, buf + buf_size, &writer);
ok =
cursor_push_byte(&writer, tag) &&
cursor_push(&writer, (unsigned char*)val, val_len) &&
cursor_push(&writer, (unsigned char*)&timestamp, sizeof(timestamp));
if (!ok)
return 0;
return writer.p - writer.start;
}
static int ndb_query_plan_execute_tags(struct ndb_txn *txn,
struct ndb_filter *filter,
struct ndb_query_results *results,
int limit)
{
MDB_cursor *cur;
MDB_dbi db;
MDB_val k, v;
int len, taglen, rc, i;
uint64_t *pint, until, note_id;
unsigned char key_buffer[255];
struct ndb_note *note;
struct ndb_filter_elements *tags;
union ndb_filter_element *tag;
struct ndb_query_result res;
db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS];
if (!(tags = ndb_filter_get_elems(filter, NDB_FILTER_TAGS)))
return 0;
until = UINT64_MAX;
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
until = *pint;
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
return 0;
for (i = 0; i < tags->count; i++) {
tag = &tags->elements[i];
taglen = tags->field.elem_type == NDB_ELEMENT_ID
? 32 : strlen(tag->string);
if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer),
tags->field.tag, tag->id, taglen,
until)))
return 0;
k.mv_data = key_buffer;
k.mv_size = len;
if (!ndb_cursor_start(cur, &k, &v))
continue;
// for each id in our ids filter, find in the db
while (!query_is_full(results, limit)) {
// check if tag value matches, bail if not
if (((unsigned char *)k.mv_data)[0] != tags->field.tag)
break;
// check if tag value matches, bail if not
if (taglen != k.mv_size - 9)
break;
if (memcmp(k.mv_data+1, tag->id, k.mv_size-9))
break;
note_id = *(uint64_t*)v.mv_data;
if (!(note = ndb_get_note_by_key(txn, note_id, NULL)))
continue;
if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS))
goto next;
ndb_query_result_init(&res, note, note_id);
if (!push_query_result(results, &res))
break;
next:
if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
break;
}
}
mdb_cursor_close(cur);
return 1;
}
static int ndb_query_plan_execute_kinds(struct ndb_txn *txn,
struct ndb_filter *filter,
struct ndb_query_results *results,
@@ -2503,12 +2647,17 @@ static int ndb_query_plan_execute_kinds(struct ndb_txn *txn,
break;
note_id = *(uint64_t*)v.mv_data;
if ((note = ndb_get_note_by_key(txn, note_id, NULL))) {
ndb_query_result_init(&res, note, note_id);
if (!push_query_result(results, &res))
break;
}
if (!(note = ndb_get_note_by_key(txn, note_id, NULL)))
goto next;
if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_KINDS))
goto next;
ndb_query_result_init(&res, note, note_id);
if (!push_query_result(results, &res))
break;
next:
if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
break;
}
@@ -2530,10 +2679,10 @@ static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter)
// this is rougly similar to the heuristic in strfry's dbscan
if (ids) {
return NDB_PLAN_IDS;
} else if (tags) {
return NDB_PLAN_TAGS;
} else if (authors) {
} else if (authors && authors->count <= 5) {
return NDB_PLAN_AUTHORS;
} else if (tags && tags->count <= 5) {
return NDB_PLAN_TAGS;
} else if (kinds) {
return NDB_PLAN_KINDS;
}
@@ -2548,6 +2697,7 @@ static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
{
struct ndb_query_results results;
uint64_t limit, *pint;
enum ndb_query_plan plan;
limit = capacity;
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT)))
@@ -2558,7 +2708,9 @@ static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
((unsigned char *)res) + limit * sizeof(*res),
&results.cur);
switch (ndb_filter_plan(filter)) {
plan = ndb_filter_plan(filter);
ndb_debug("using query plan %d\n", plan);
switch (plan) {
// We have a list of ids, just open a cursor and jump to each once
case NDB_PLAN_IDS:
if (!ndb_query_plan_execute_ids(txn, filter, &results, limit))
@@ -2571,10 +2723,13 @@ static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
return 0;
break;
case NDB_PLAN_TAGS:
if (!ndb_query_plan_execute_tags(txn, filter, &results, limit))
return 0;
break;
// TODO: finish query execution plans!
case NDB_PLAN_CREATED:
case NDB_PLAN_AUTHORS:
case NDB_PLAN_TAGS:
return 0;
}
@@ -2608,6 +2763,61 @@ int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters,
return 1;
}
static int ndb_write_note_tag_index(struct ndb_txn *txn, struct ndb_note *note,
uint64_t note_key)
{
unsigned char key_buffer[255];
struct ndb_iterator iter;
struct ndb_str tkey, tval;
char tchar;
int len, rc;
MDB_val key, val;
MDB_dbi tags_db;
tags_db = txn->lmdb->dbs[NDB_DB_NOTE_TAGS];
ndb_tags_iterate_start(note, &iter);
while (ndb_tags_iterate_next(&iter)) {
if (iter.tag->count < 2)
continue;
tkey = ndb_tag_str(note, iter.tag, 0);
// we only write indices for 1-char tags.
tchar = tkey.str[0];
if (tchar == 0 || tkey.str[1] != 0)
continue;
tval = ndb_tag_str(note, iter.tag, 1);
len = ndb_str_len(&tval);
if (!(len = ndb_encode_tag_key(key_buffer, sizeof(key_buffer),
tchar, tval.id, (unsigned char)len,
ndb_note_created_at(note)))) {
// this will fail when we try to encode a key that is
// too big
continue;
}
//ndb_debug("writing tag '%c':'data:%d' to index\n", tchar, len);
key.mv_data = key_buffer;
key.mv_size = len;
val.mv_data = &note_key;
val.mv_size = sizeof(note_key);
if ((rc = mdb_put(txn->mdb_txn, tags_db, &key, &val, 0))) {
ndb_debug("write note tag index to db failed: %s\n",
mdb_strerror(rc));
return 0;
}
}
return 1;
}
static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note,
uint64_t note_key)
{
@@ -3202,13 +3412,9 @@ static uint64_t ndb_write_note(struct ndb_txn *txn,
return 0;
}
// write id index key clustered with created_at
if (!ndb_write_note_id_index(txn, note->note, note_key))
return 0;
// write note kind index
if (!ndb_write_note_kind_index(txn, note->note, note_key))
return 0;
ndb_write_note_id_index(txn, note->note, note_key);
ndb_write_note_kind_index(txn, note->note, note_key);
ndb_write_note_tag_index(txn, note->note, note_key);
// only parse content and do fulltext index on text and longform notes
if (note->note->kind == 1 || note->note->kind == 30023) {
@@ -3690,6 +3896,13 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map
return 0;
}
if ((rc = mdb_dbi_open(txn, "note_tags", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED,
&lmdb->dbs[NDB_DB_NOTE_TAGS]))) {
fprintf(stderr, "mdb_dbi_open note_tags failed: %s\n", mdb_strerror(rc));
return 0;
}
mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_TAGS], ndb_tag_key_compare);
// Commit the transaction
if ((rc = mdb_txn_commit(txn))) {
fprintf(stderr, "mdb_txn_commit failed, error %d\n", rc);
@@ -5025,6 +5238,27 @@ void ndb_config_set_ingest_filter(struct ndb_config *config,
config->filter_context = filter_ctx;
}
int ndb_print_tag_keys(struct ndb_txn *txn)
{
MDB_cursor *cur;
MDB_val k, v;
int i;
if (mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_TAGS], &cur))
return 0;
i = 1;
while (mdb_cursor_get(cur, &k, &v, MDB_NEXT) == 0) {
printf("%d note_tags '%.*s' %" PRIu64 "\n",
i, (int)k.mv_size-8, (const char *)k.mv_data,
*(uint64_t*)(k.mv_data+(k.mv_size-8)));
i++;
}
return 1;
}
int ndb_print_kind_keys(struct ndb_txn *txn)
{
MDB_cursor *cur;
@@ -5098,6 +5332,13 @@ struct ndb_str ndb_tag_str(struct ndb_note *note, struct ndb_tag *tag, int ind)
return ndb_note_str(note, &tag->strs[ind]);
}
int ndb_str_len(struct ndb_str *str)
{
if (str->flag == NDB_PACKED_ID)
return 32;
return strlen(str->str);
}
struct ndb_str ndb_iter_tag_str(struct ndb_iterator *iter, int ind)
{
return ndb_tag_str(iter->note, iter->tag, ind);
@@ -5160,13 +5401,15 @@ void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter)
int ndb_tags_iterate_next(struct ndb_iterator *iter)
{
struct ndb_tags *tags;
if (iter->tag == NULL || iter->index == -1) {
iter->tag = iter->note->tags.tag;
iter->index = 0;
return iter->note->tags.count != 0;
}
struct ndb_tags *tags = &iter->note->tags;
tags = &iter->note->tags;
if (++iter->index < tags->count) {
uint32_t tag_data_size = iter->tag->count * sizeof(iter->tag->strs[0]);
@@ -5260,6 +5503,8 @@ const char *ndb_db_name(enum ndb_dbs db)
return "note_fulltext";
case NDB_DB_NOTE_BLOCKS:
return "note_blocks";
case NDB_DB_NOTE_TAGS:
return "note_tags";
case NDB_DBS:
return "count";
}

View File

@@ -170,6 +170,7 @@ enum ndb_dbs {
NDB_DB_NOTE_KIND, // note kind index
NDB_DB_NOTE_TEXT, // note fulltext index
NDB_DB_NOTE_BLOCKS, // parsed note blocks for rendering
NDB_DB_NOTE_TAGS, // note tags index
NDB_DBS,
};
@@ -502,6 +503,7 @@ unsigned char *ndb_note_pubkey(struct ndb_note *note);
unsigned char *ndb_note_sig(struct ndb_note *note);
void _ndb_note_set_kind(struct ndb_note *note, uint32_t kind);
struct ndb_tags *ndb_note_tags(struct ndb_note *note);
int ndb_str_len(struct ndb_str *str);
// TAGS
void ndb_tags_iterate_start(struct ndb_note *note, struct ndb_iterator *iter);