nostrdb: plan: add created_at query plan

This introduces the basic created_at query plan. We scan the created_at
+ id index in descending order looking for items that match a filter.
This is a very general query plan, but might not be very efficient for
anything other than local timelines.

Changelog-Added: Add general created_at query plan for timelines
Closes: https://github.com/damus-io/nostrdb/issues/26
Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2024-03-12 18:55:23 +00:00
committed by Daniel D’Aquino
parent 25e91b386c
commit bd17dcfac6

View File

@@ -992,12 +992,13 @@ static int compare_ids(const void *pa, const void *pb)
static int search_ids(const void *ctx, const void *mid_ptr)
{
struct search_id_state *state;
unsigned char *mid_id;
uint32_t mid;
state = (struct search_id_state *)ctx;
mid = *(uint32_t *)mid_ptr;
unsigned char *mid_id = ndb_filter_elements_data(state->filter, mid);
mid_id = ndb_filter_elements_data(state->filter, mid);
assert(mid_id);
return memcmp(state->key, mid_id, 32);
@@ -1780,7 +1781,7 @@ static void *ndb_lookup_by_key(struct ndb_txn *txn, uint64_t key,
k.mv_size = sizeof(key);
if (mdb_get(txn->mdb_txn, txn->lmdb->dbs[store], &k, &v)) {
ndb_debug("ndb_get_profile_by_pubkey: mdb_get note failed\n");
ndb_debug("ndb_lookup_by_key: mdb_get note failed\n");
return NULL;
}
@@ -2707,6 +2708,73 @@ static int ndb_encode_tag_key(unsigned char *buf, int buf_size,
return writer.p - writer.start;
}
static int ndb_query_plan_execute_created_at(struct ndb_txn *txn,
struct ndb_filter *filter,
struct ndb_query_results *results,
int limit)
{
MDB_dbi db;
MDB_val k, v;
MDB_cursor *cur;
int rc;
struct ndb_note *note;
struct ndb_tsid key, *pkey;
uint64_t *pint, until, since, note_id, note_size;
struct ndb_query_result res;
unsigned char high_key[32] = {0xFF};
db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
until = UINT64_MAX;
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
until = *pint;
since = 0;
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_SINCE)))
since = *pint;
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
return 0;
// if we have until, start there, otherwise just use max
ndb_tsid_init(&key, high_key, until);
k.mv_data = &key;
k.mv_size = sizeof(key);
if (!ndb_cursor_start(cur, &k, &v))
return 1;
while (!query_is_full(results, limit)) {
pkey = (struct ndb_tsid *)k.mv_data;
note_id = *(uint64_t*)v.mv_data;
assert(v.mv_size == 8);
// TODO(perf): if we are only looking for IDs and have no other
// condition, then we can use the ID in the index without
// looking up the note. For now we always look up the note
if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
goto next;
// does this entry match our filter?
if (!ndb_filter_matches_with(filter, note, 0))
goto next;
// don't continue the scan if we're below `since`
if (pkey->timestamp < since)
break;
ndb_query_result_init(&res, note, note_size, note_id);
if (!push_query_result(results, &res))
break;
next:
if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
break;
}
mdb_cursor_close(cur);
return 1;
}
static int ndb_query_plan_execute_tags(struct ndb_txn *txn,
struct ndb_filter *filter,
struct ndb_query_results *results,
@@ -2769,7 +2837,7 @@ static int ndb_query_plan_execute_tags(struct ndb_txn *txn,
note_id = *(uint64_t*)v.mv_data;
if (!(note = ndb_get_note_by_key(txn, note_id, &note_size)))
continue;
goto next;
if (!ndb_filter_matches_with(filter, note, 1 << NDB_FILTER_TAGS))
goto next;
@@ -2881,6 +2949,18 @@ static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter)
return NDB_PLAN_CREATED;
}
static const char *ndb_query_plan_name(int plan_id)
{
switch (plan_id) {
case NDB_PLAN_IDS: return "ids";
case NDB_PLAN_KINDS: return "kinds";
case NDB_PLAN_TAGS: return "tags";
case NDB_PLAN_CREATED: return "created";
case NDB_PLAN_AUTHORS: return "authors";
}
return "unknown";
}
static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
struct ndb_query_result *res, int capacity,
@@ -2900,7 +2980,7 @@ static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
&results.cur);
plan = ndb_filter_plan(filter);
ndb_debug("using query plan %d\n", plan);
ndb_debug("using query plan '%s'\n", ndb_query_plan_name(plan));
switch (plan) {
// We have a list of ids, just open a cursor and jump to each once
case NDB_PLAN_IDS:
@@ -2918,9 +2998,12 @@ static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
if (!ndb_query_plan_execute_tags(txn, filter, &results, limit))
return 0;
break;
// TODO: finish query execution plans!
case NDB_PLAN_CREATED:
if (!ndb_query_plan_execute_created_at(txn, filter, &results, limit))
return 0;
break;
case NDB_PLAN_AUTHORS:
// TODO: finish authors query plan
return 0;
}
@@ -2934,6 +3017,7 @@ int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters,
int i, out;
struct ndb_query_result *p = results;
out = 0;
*count = 0;
for (i = 0; i < num_filters; i++) {