nostrdb/Query Plans

Instead of running queries off filters directly, we do some simple
heuristics and determine a reasonable query plan for the given filter.

To test this, also add a kind index query plan and add a test for it.

We still need tag, author, and created_at index scans. This is up next!

Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2024-01-05 20:45:45 -08:00
committed by Daniel D’Aquino
parent d598e178c1
commit a45f4d3087
2 changed files with 231 additions and 209 deletions

View File

@@ -203,13 +203,22 @@ struct ndb {
// lmdb environ handles, etc
};
// We get the KeyMatchResult function from the scan_cursor_type
// This function is used to match the key for the corresponding cursor type.
// For example, KIND scanners will look for a kind
enum ndb_scan_cursor_type {
NDB_SCAN_KIND,
NDB_SCAN_PK_KIND,
NDB_SCAN_ID,
///
/// Query Plans
///
/// There are general strategies for performing certain types of query
/// depending on the filter. For example, for large contact list queries
/// with many authors, we simply do a descending scan on created_at
/// instead of doing 1000s of pubkey scans.
///
/// Query plans are calculated from filters via `ndb_filter_plan`
///
enum ndb_query_plan {
NDB_PLAN_KINDS,
NDB_PLAN_IDS,
NDB_PLAN_AUTHORS,
NDB_PLAN_CREATED,
NDB_PLAN_TAGS,
};
// A clustered key with an id and a timestamp
@@ -1498,7 +1507,8 @@ int ndb_write_last_profile_fetch(struct ndb *ndb, const unsigned char *pubkey,
// after the first element, so we have to go back one.
static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v)
{
// Position cursor at the next key greater than or equal to the specified key
// Position cursor at the next key greater than or equal to the
// specified key
if (mdb_cursor_get(cur, k, v, MDB_SET_RANGE)) {
// Failed :(. It could be the last element?
if (mdb_cursor_get(cur, k, v, MDB_LAST))
@@ -2292,8 +2302,9 @@ static int ndb_filter_group_add_filters(struct ndb_filter_group *group,
return 1;
}
static int ndb_filter_int(struct ndb_filter *filter,
enum ndb_filter_fieldtype typ, uint64_t *lim)
static struct ndb_filter_elements *
ndb_filter_get_elems(struct ndb_filter *filter, enum ndb_filter_fieldtype typ)
{
int i;
struct ndb_filter_elements *els;
@@ -2301,114 +2312,35 @@ static int ndb_filter_int(struct ndb_filter *filter,
for (i = 0; i < filter->num_elements; i++) {
els = filter->elements[i];
if (els->field.type == typ) {
*lim = els->elements[0].integer;
return 1;
return els;
}
}
return 0;
return NULL;
}
static int ndb_filter_get_limit(struct ndb_filter *filter, uint64_t *lim)
static union ndb_filter_element *
ndb_filter_get_elem(struct ndb_filter *filter, enum ndb_filter_fieldtype typ)
{
return ndb_filter_int(filter, NDB_FILTER_LIMIT, lim);
struct ndb_filter_elements *els;
if ((els = ndb_filter_get_elems(filter, typ)))
return &els->elements[0];
return NULL;
}
static int ndb_filter_get_until(struct ndb_filter *filter, uint64_t *lim)
static uint64_t *ndb_filter_get_int(struct ndb_filter *filter,
enum ndb_filter_fieldtype typ)
{
return ndb_filter_int(filter, NDB_FILTER_UNTIL, lim);
}
static int ndb_filter_get_since(struct ndb_filter *filter, uint64_t *lim)
{
return ndb_filter_int(filter, NDB_FILTER_SINCE, lim);
}
static int ndb_query_filter_kind(struct ndb_txn *txn, struct ndb_filter *filter,
MDB_cursor *cur, uint64_t kind, uint64_t since,
int *matched, struct ndb_query_result *res)
{
MDB_val k, v;
uint64_t note_id;
struct ndb_u64_tsid tsid, *ptsid;
res->note = NULL;
ndb_u64_tsid_init(&tsid, kind, since);
k.mv_data = &tsid;
k.mv_size = sizeof(tsid);
if (!ndb_cursor_start(cur, &k, &v))
union ndb_filter_element *el = NULL;
if (!(el = ndb_filter_get_elem(filter, typ)))
return 0;
ptsid = (struct ndb_u64_tsid *)k.mv_data;
note_id = *(uint64_t*)v.mv_data;
if (kind == ptsid->u64)
*matched |= 1 << NDB_FILTER_KINDS;
else
return 1;
// get the note because we need it to match against the filter
if (!(res->note = ndb_get_note_by_key(txn, note_id, NULL)))
return 1;
// Sure this particular lookup matched the index query, but does it
// match the entire filter? Check! We also pass in things we've already
// matched via the filter so we don't have to check again. This can be
// pretty important for filters with a large number of entries.
if (!ndb_filter_matches_with(filter, res->note, *matched))
return 1;
return 2;
return &el->integer;
}
static int ndb_query_filter_id(struct ndb_txn *txn, struct ndb_filter *filter,
MDB_cursor *cur, const unsigned char *id,
uint64_t since, int *matched,
struct ndb_query_result *res)
{
MDB_val k, v;
uint64_t note_id;
struct ndb_tsid tsid, *ptsid;
res->note = NULL;
ndb_tsid_init(&tsid, (unsigned char *)id, since);
k.mv_data = &tsid;
k.mv_size = sizeof(tsid);
if (!ndb_cursor_start(cur, &k, &v))
return 0;
ptsid = (struct ndb_tsid *)k.mv_data;
note_id = *(uint64_t*)v.mv_data;
if (memcmp(id, ptsid->id, 32) == 0)
*matched |= 1 << NDB_FILTER_AUTHORS;
else
return 1;
// get the note because we need it to match against the filter
if (!(res->note = ndb_get_note_by_key(txn, note_id, NULL)))
return 1;
// Sure this particular lookup matched the index query, but does it
// match the entire filter? Check! We also pass in things we've already
// matched via the filter so we don't have to check again. This can be
// pretty important for filters with a large number of entries.
if (!ndb_filter_matches_with(filter, res->note, *matched))
return 1;
return 2;
}
static inline int push_query_result(struct cursor *res,
static inline int push_query_result(struct ndb_query_results *results,
struct ndb_query_result *result)
{
return cursor_push(res, (unsigned char*)result, sizeof(*result));
return cursor_push(&results->cur, (unsigned char*)result, sizeof(*result));
}
static int compare_query_results(const void *pa, const void *pb)
@@ -2427,133 +2359,218 @@ static int compare_query_results(const void *pa, const void *pb)
}
}
static int query_is_full(struct cursor *results, int limit)
static void ndb_query_result_init(struct ndb_query_result *res,
struct ndb_note *note,
uint64_t note_id)
{
if (results->p >= results->end)
return 1;
return cursor_count(results, sizeof(struct ndb_query_result)) >= limit;
*res = (struct ndb_query_result){
.note_id = note_id,
.note = note,
};
}
static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
struct ndb_query_result *results, int capacity,
int *results_out)
static int query_is_full(struct ndb_query_results *results, int limit)
{
if (results->cur.p >= results->cur.end)
return 1;
return cursor_count(&results->cur, sizeof(struct ndb_query_result)) >= limit;
}
static int ndb_query_plan_execute_ids(struct ndb_txn *txn,
struct ndb_filter *filter,
struct ndb_query_results *results,
int limit
)
{
struct ndb_filter_elements *els;
struct ndb_query_result res;
struct cursor results_arr;
uint64_t limit, since, until, kind;
const unsigned char *id;
int i, k, rc, matched;
MDB_cursor *cur;
MDB_dbi db;
MDB_val k, v;
int matched, rc, i;
struct ndb_filter_elements *ids;
struct ndb_note *note;
struct ndb_query_result res;
struct ndb_tsid tsid, *ptsid;
uint64_t note_id, until, *pint;
unsigned char *id;
since = UINT64_MAX;
matched = 0;
until = UINT64_MAX;
limit = capacity;
ndb_filter_get_limit(filter, &limit);
ndb_filter_get_since(filter, &since);
ndb_filter_get_until(filter, &until);
if (!(ids = ndb_filter_get_elems(filter, NDB_FILTER_IDS)))
return 0;
limit = min(capacity, limit);
make_cursor((unsigned char *)results,
((unsigned char *)results) + limit * sizeof(*results),
&results_arr);
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_UNTIL)))
until = *pint;
for (i = 0; i < filter->num_elements; i++) {
matched = 0;
if (query_is_full(&results_arr, limit))
goto done;
db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
return 0;
els = filter->elements[i];
switch (els->field.type) {
case NDB_FILTER_IDS:
db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
return 0;
// for each id in our ids filter, find in the db
for (i = 0; i < ids->count; i++) {
if (query_is_full(results, limit))
break;
// for each id in our ids filter, find in the db
for (k = 0; k < els->count; k++) {
if (query_is_full(&results_arr, limit)) {
mdb_cursor_close(cur);
goto done;
}
id = (unsigned char*)ids->elements[i].id;
ndb_tsid_init(&tsid, (unsigned char *)id, until);
id = els->elements[k].id;
if (!(rc = ndb_query_filter_id(txn, filter, cur,
id, since,
&matched,
&res))) {
// there was a fatal error
mdb_cursor_close(cur);
return 0;
}
k.mv_data = &tsid;
k.mv_size = sizeof(tsid);
// no match, just try next id
if (rc == 1)
continue;
if (!ndb_cursor_start(cur, &k, &v))
continue;
// rc > 1, matched!
if (!push_query_result(&results_arr, &res)) {
// this should never happen, but if
// it fails to push that means there
// are no more result to push,
// so just return
mdb_cursor_close(cur);
goto done;
}
ptsid = (struct ndb_tsid *)k.mv_data;
note_id = *(uint64_t*)v.mv_data;
// look for more ids... continue!
if (memcmp(id, ptsid->id, 32) == 0)
matched |= 1 << NDB_FILTER_AUTHORS;
else
continue;
// get the note because we need it to match against the filter
if (!(note = ndb_get_note_by_key(txn, note_id, NULL)))
continue;
// Sure this particular lookup matched the index query, but
// does it match the entire filter? Check! We also pass in
// things we've already matched via the filter so we don't have
// to check again. This can be pretty important for filters
// with a large number of entries.
if (!ndb_filter_matches_with(filter, note, matched))
continue;
ndb_query_result_init(&res, note, note_id);
if (!push_query_result(results, &res))
break;
}
mdb_cursor_close(cur);
return 1;
}
static int ndb_query_plan_execute_kinds(struct ndb_txn *txn,
struct ndb_filter *filter,
struct ndb_query_results *results,
int limit)
{
MDB_cursor *cur;
MDB_dbi db;
MDB_val k, v;
struct ndb_note *note;
struct ndb_u64_tsid tsid, *ptsid;
struct ndb_filter_elements *kinds;
struct ndb_query_result res;
uint64_t kind, note_id;
int i, rc;
// we should have kinds in a kinds filter!
if (!(kinds = ndb_filter_get_elems(filter, NDB_FILTER_KINDS)))
return 0;
db = txn->lmdb->dbs[NDB_DB_NOTE_KIND];
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
return 0;
for (i = 0; i < kinds->count; i++) {
if (query_is_full(results, limit))
break;
kind = kinds->elements[i].integer;
ndb_debug("kind %" PRIu64 "\n", kind);
ndb_u64_tsid_init(&tsid, kind, UINT64_MAX);
k.mv_data = &tsid;
k.mv_size = sizeof(tsid);
if (!ndb_cursor_start(cur, &k, &v))
continue;
// for each id in our ids filter, find in the db
while (!query_is_full(results, limit)) {
ptsid = (struct ndb_u64_tsid *)k.mv_data;
if (ptsid->u64 != kind)
break;
note_id = *(uint64_t*)v.mv_data;
if ((note = ndb_get_note_by_key(txn, note_id, NULL))) {
ndb_query_result_init(&res, note, note_id);
if (!push_query_result(results, &res))
break;
}
mdb_cursor_close(cur);
break;
case NDB_FILTER_AUTHORS:
break;
case NDB_FILTER_KINDS:
db = txn->lmdb->dbs[NDB_DB_NOTE_KIND];
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
return 0;
// for each id in our ids filter, find in the db
for (k = 0; k < els->count; k++) {
if (query_is_full(&results_arr, limit)) {
mdb_cursor_close(cur);
goto done;
}
kind = els->elements[k].integer;
if (!(rc = ndb_query_filter_kind(txn, filter,
cur, kind,
since,
&matched,
&res))) {
// there was a fatal error
mdb_cursor_close(cur);
return 0;
}
// rc > 1, matched!
if (!push_query_result(&results_arr, &res)) {
mdb_cursor_close(cur);
goto done;
}
}
mdb_cursor_close(cur);
break;
case NDB_FILTER_GENERIC:
break;
case NDB_FILTER_SINCE:
case NDB_FILTER_UNTIL:
case NDB_FILTER_LIMIT:
break;
if (mdb_cursor_get(cur, &k, &v, MDB_PREV))
break;
}
}
done:
*results_out = cursor_count(&results_arr, sizeof(*results));
mdb_cursor_close(cur);
return 1;
}
static enum ndb_query_plan ndb_filter_plan(struct ndb_filter *filter)
{
struct ndb_filter_elements *ids, *kinds, *authors, *tags;
ids = ndb_filter_get_elems(filter, NDB_FILTER_IDS);
kinds = ndb_filter_get_elems(filter, NDB_FILTER_KINDS);
authors = ndb_filter_get_elems(filter, NDB_FILTER_AUTHORS);
tags = ndb_filter_get_elems(filter, NDB_FILTER_TAGS);
// this is rougly similar to the heuristic in strfry's dbscan
if (ids) {
return NDB_PLAN_IDS;
} else if (tags) {
return NDB_PLAN_TAGS;
} else if (authors) {
return NDB_PLAN_AUTHORS;
} else if (kinds) {
return NDB_PLAN_KINDS;
}
return NDB_PLAN_CREATED;
}
static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
struct ndb_query_result *res, int capacity,
int *results_out)
{
struct ndb_query_results results;
uint64_t limit, *pint;
limit = capacity;
if ((pint = ndb_filter_get_int(filter, NDB_FILTER_LIMIT)))
limit = *pint;
limit = min(capacity, limit);
make_cursor((unsigned char *)res,
((unsigned char *)res) + limit * sizeof(*res),
&results.cur);
switch (ndb_filter_plan(filter)) {
// We have a list of ids, just open a cursor and jump to each once
case NDB_PLAN_IDS:
if (!ndb_query_plan_execute_ids(txn, filter, &results, limit))
return 0;
break;
// We have just kinds, just scan the kind index
case NDB_PLAN_KINDS:
if (!ndb_query_plan_execute_kinds(txn, filter, &results, limit))
return 0;
break;
// TODO: finish query execution plans!
case NDB_PLAN_CREATED:
case NDB_PLAN_AUTHORS:
case NDB_PLAN_TAGS:
return 0;
}
*results_out = cursor_count(&results.cur, sizeof(*res));
return 1;
}

View File

@@ -399,6 +399,11 @@ struct ndb_block_iterator {
struct ndb_query_result {
struct ndb_note *note;
uint64_t note_id;
};
struct ndb_query_results {
struct cursor cur;
};
// CONFIG