nostrdb/Initial nostrdb queries
Still a lot more work to do, but this is at least a proof of concept for querying nostrdb using filters. Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
committed by
Daniel D’Aquino
parent
b2080a946e
commit
9f1b9ab945
@@ -836,7 +836,8 @@ static int compare_kinds(const void *pa, const void *pb)
|
|||||||
|
|
||||||
|
|
||||||
// returns 1 if a filter matches a note
|
// returns 1 if a filter matches a note
|
||||||
int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note)
|
static int ndb_filter_matches_with(struct ndb_filter *filter,
|
||||||
|
struct ndb_note *note, int already_matched)
|
||||||
{
|
{
|
||||||
int i, j;
|
int i, j;
|
||||||
unsigned char *id;
|
unsigned char *id;
|
||||||
@@ -845,6 +846,11 @@ int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note)
|
|||||||
for (i = 0; i < filter->num_elements; i++) {
|
for (i = 0; i < filter->num_elements; i++) {
|
||||||
els = filter->elements[i];
|
els = filter->elements[i];
|
||||||
|
|
||||||
|
// if we know we already match from a query scan result,
|
||||||
|
// we can skip this check
|
||||||
|
if ((1 << els->field.type) & already_matched)
|
||||||
|
continue;
|
||||||
|
|
||||||
switch (els->field.type) {
|
switch (els->field.type) {
|
||||||
case NDB_FILTER_KINDS:
|
case NDB_FILTER_KINDS:
|
||||||
for (j = 0; j < els->count; j++) {
|
for (j = 0; j < els->count; j++) {
|
||||||
@@ -891,6 +897,11 @@ cont:
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ndb_filter_matches(struct ndb_filter *filter, struct ndb_note *note)
|
||||||
|
{
|
||||||
|
return ndb_filter_matches_with(filter, note, 0);
|
||||||
|
}
|
||||||
|
|
||||||
void ndb_filter_end_field(struct ndb_filter *filter)
|
void ndb_filter_end_field(struct ndb_filter *filter)
|
||||||
{
|
{
|
||||||
struct ndb_filter_elements *cur;
|
struct ndb_filter_elements *cur;
|
||||||
@@ -2279,22 +2290,232 @@ static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note,
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
static int ndb_filter_group_add_filters(struct ndb_filter_group *group,
|
||||||
static int ndb_filter_query(struct ndb *ndb, struct ndb_filter *filter)
|
struct ndb_filter *filters,
|
||||||
|
int num_filters)
|
||||||
{
|
{
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for (i = 0; i < num_filters; i++) {
|
||||||
|
if (!ndb_filter_group_add(group, &filters[i]))
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int ndb_filter_cursors(struct ndb_filter *filter, struct ndb_cursor)
|
static int ndb_filter_int(struct ndb_filter *filter,
|
||||||
|
enum ndb_filter_fieldtype typ, uint64_t *lim)
|
||||||
{
|
{
|
||||||
|
int i;
|
||||||
|
struct ndb_filter_elements *els;
|
||||||
|
|
||||||
|
for (i = 0; i < filter->num_elements; i++) {
|
||||||
|
els = filter->elements[i];
|
||||||
|
if (els->field.type == typ) {
|
||||||
|
*lim = els->elements[0].integer;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ndb_query(struct ndb *ndb, struct ndb_filter **filters, int num_filters)
|
static int ndb_filter_get_limit(struct ndb_filter *filter, uint64_t *lim)
|
||||||
{
|
{
|
||||||
struct ndb_filter_group group;
|
return ndb_filter_int(filter, NDB_FILTER_LIMIT, lim);
|
||||||
ndb_filter_group_init(&group);
|
}
|
||||||
|
|
||||||
|
static int ndb_filter_get_until(struct ndb_filter *filter, uint64_t *lim)
|
||||||
|
{
|
||||||
|
return ndb_filter_int(filter, NDB_FILTER_UNTIL, lim);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int ndb_filter_get_since(struct ndb_filter *filter, uint64_t *lim)
|
||||||
|
{
|
||||||
|
return ndb_filter_int(filter, NDB_FILTER_SINCE, lim);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int ndb_query_filter_id(struct ndb_txn *txn, struct ndb_filter *filter,
|
||||||
|
MDB_cursor *cur, const unsigned char *id,
|
||||||
|
uint64_t since, int *matched,
|
||||||
|
struct ndb_query_result *res)
|
||||||
|
{
|
||||||
|
MDB_val k, v;
|
||||||
|
uint64_t note_id;
|
||||||
|
struct ndb_tsid tsid, *ptsid;
|
||||||
|
|
||||||
|
res->note = NULL;
|
||||||
|
|
||||||
|
ndb_tsid_init(&tsid, (unsigned char *)id, since);
|
||||||
|
|
||||||
|
k.mv_data = &tsid;
|
||||||
|
k.mv_size = sizeof(tsid);
|
||||||
|
|
||||||
|
if (!ndb_cursor_start(cur, &k, &v))
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
ptsid = (struct ndb_tsid *)k.mv_data;
|
||||||
|
note_id = *(uint64_t*)v.mv_data;
|
||||||
|
|
||||||
|
if (memcmp(id, ptsid->id, 32) == 0)
|
||||||
|
*matched |= 1 << NDB_FILTER_AUTHORS;
|
||||||
|
else
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
// get the note because we need it to match against the filter
|
||||||
|
if (!(res->note = ndb_get_note_by_key(txn, note_id, NULL)))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
// Sure this particular lookup matched the index query, but does it
|
||||||
|
// match the entire filter? Check! We also pass in things we've already
|
||||||
|
// matched via the filter so we don't have to check again. This can be
|
||||||
|
// pretty important for filters with a large number of entries.
|
||||||
|
if (!ndb_filter_matches_with(filter, res->note, *matched))
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int push_query_result(struct cursor *res,
|
||||||
|
struct ndb_query_result *result)
|
||||||
|
{
|
||||||
|
return cursor_push(res, (unsigned char*)result, sizeof(*result));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int compare_query_results(const void *pa, const void *pb)
|
||||||
|
{
|
||||||
|
struct ndb_query_result *a, *b;
|
||||||
|
|
||||||
|
a = (struct ndb_query_result *)pa;
|
||||||
|
b = (struct ndb_query_result *)pb;
|
||||||
|
|
||||||
|
if (a->note->created_at == b->note->created_at) {
|
||||||
|
return memcmp(a->note->id, b->note->id, 32);
|
||||||
|
} else if (a->note->created_at > b->note->created_at) {
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int ndb_query_filter(struct ndb_txn *txn, struct ndb_filter *filter,
|
||||||
|
struct ndb_query_result *results, int capacity,
|
||||||
|
int *results_out)
|
||||||
|
{
|
||||||
|
struct ndb_filter_elements *els;
|
||||||
|
struct ndb_query_result res;
|
||||||
|
struct cursor results_arr;
|
||||||
|
uint64_t limit, since, until;
|
||||||
|
const unsigned char *id;
|
||||||
|
int i, k, rc;
|
||||||
|
MDB_cursor *cur;
|
||||||
|
MDB_dbi db;
|
||||||
|
|
||||||
|
since = UINT64_MAX;
|
||||||
|
until = UINT64_MAX;
|
||||||
|
limit = capacity;
|
||||||
|
|
||||||
|
ndb_filter_get_limit(filter, &limit);
|
||||||
|
ndb_filter_get_since(filter, &since);
|
||||||
|
ndb_filter_get_until(filter, &until);
|
||||||
|
|
||||||
|
limit = min(capacity, limit);
|
||||||
|
make_cursor((unsigned char *)results,
|
||||||
|
((unsigned char *)results) + limit * sizeof(*results),
|
||||||
|
&results_arr);
|
||||||
|
|
||||||
|
for (i = 0; i < filter->num_elements; i++) {
|
||||||
|
if (results_arr.p >= results_arr.end)
|
||||||
|
goto done;
|
||||||
|
|
||||||
|
els = filter->elements[i];
|
||||||
|
switch (els->field.type) {
|
||||||
|
case NDB_FILTER_IDS:
|
||||||
|
int matched = 0;
|
||||||
|
db = txn->lmdb->dbs[NDB_DB_NOTE_ID];
|
||||||
|
if ((rc = mdb_cursor_open(txn->mdb_txn, db, &cur)))
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
// for each id in our ids filter, find in the db
|
||||||
|
for (k = 0; k < els->count; k++) {
|
||||||
|
if (results_arr.p >= results_arr.end) {
|
||||||
|
mdb_cursor_close(cur);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
|
||||||
|
id = els->elements[k].id;
|
||||||
|
if (!(rc = ndb_query_filter_id(txn, filter, cur,
|
||||||
|
id, since,
|
||||||
|
&matched,
|
||||||
|
&res))) {
|
||||||
|
// there was a fatal error
|
||||||
|
mdb_cursor_close(cur);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// no match, just try next id
|
||||||
|
if (rc == 1)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// rc > 1, matched!
|
||||||
|
if (!push_query_result(&results_arr, &res)) {
|
||||||
|
// this should never happen, but if
|
||||||
|
// it fails to push that means there
|
||||||
|
// are no more result to push,
|
||||||
|
// so just return
|
||||||
|
mdb_cursor_close(cur);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
|
||||||
|
// look for more ids... continue!
|
||||||
|
}
|
||||||
|
|
||||||
|
mdb_cursor_close(cur);
|
||||||
|
break;
|
||||||
|
case NDB_FILTER_AUTHORS:
|
||||||
|
break;
|
||||||
|
case NDB_FILTER_KINDS:
|
||||||
|
break;
|
||||||
|
case NDB_FILTER_GENERIC:
|
||||||
|
break;
|
||||||
|
case NDB_FILTER_SINCE:
|
||||||
|
case NDB_FILTER_UNTIL:
|
||||||
|
case NDB_FILTER_LIMIT:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
done:
|
||||||
|
*results_out = cursor_count(&results_arr, sizeof(*results));
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters,
|
||||||
|
struct ndb_query_result *results, int result_capacity, int *count)
|
||||||
|
{
|
||||||
|
int i, out;
|
||||||
|
struct ndb_query_result *p = results;
|
||||||
|
|
||||||
|
*count = 0;
|
||||||
|
|
||||||
|
for (i = 0; i < num_filters; i++) {
|
||||||
|
if (!ndb_query_filter(txn, &filters[i], p,
|
||||||
|
result_capacity, &out)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
*count += out;
|
||||||
|
p += out;
|
||||||
|
result_capacity -= out;
|
||||||
|
if (result_capacity <= 0)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort results
|
||||||
|
qsort(results, *count, sizeof(*results), compare_query_results);
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note,
|
static int ndb_write_note_kind_index(struct ndb_txn *txn, struct ndb_note *note,
|
||||||
uint64_t note_key)
|
uint64_t note_key)
|
||||||
|
|||||||
@@ -397,6 +397,10 @@ struct ndb_block_iterator {
|
|||||||
unsigned char *p;
|
unsigned char *p;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct ndb_query_result {
|
||||||
|
struct ndb_note *note;
|
||||||
|
};
|
||||||
|
|
||||||
// CONFIG
|
// CONFIG
|
||||||
void ndb_default_config(struct ndb_config *);
|
void ndb_default_config(struct ndb_config *);
|
||||||
void ndb_config_set_ingest_threads(struct ndb_config *config, int threads);
|
void ndb_config_set_ingest_threads(struct ndb_config *config, int threads);
|
||||||
@@ -476,7 +480,7 @@ void ndb_text_search_config_set_order(struct ndb_text_search_config *, enum ndb_
|
|||||||
void ndb_text_search_config_set_limit(struct ndb_text_search_config *, int limit);
|
void ndb_text_search_config_set_limit(struct ndb_text_search_config *, int limit);
|
||||||
|
|
||||||
// QUERY
|
// QUERY
|
||||||
void ndb_query(struct ndb_filter **, int num_filters);
|
int ndb_query(struct ndb_txn *txn, struct ndb_filter *filters, int num_filters, struct ndb_query_result *results, int result_capacity, int *count);
|
||||||
|
|
||||||
// STATS
|
// STATS
|
||||||
int ndb_stat(struct ndb *ndb, struct ndb_stat *stat);
|
int ndb_stat(struct ndb *ndb, struct ndb_stat *stat);
|
||||||
|
|||||||
Reference in New Issue
Block a user