nostrdb/Initial nostrdb relay subscriptions

This adds some initial code for the nostrdb relay subscription monitor.

When new notes are written to the database, they are checked against
active subscriptions. If any of the subscriptions are matched, the note
primary key is written to the inbox queue for that subscription.

We also add an ndb_wait_for_notes() method that simply waits for notes
to be written by the subscription monitor.

Changelog-Added: Added filter subscriptions
Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2023-11-26 20:04:30 -08:00
committed by Daniel D’Aquino
parent bdedf8bd8c
commit 8c5ec32eaa
3 changed files with 245 additions and 21 deletions

View File

@@ -4,6 +4,10 @@
#include <inttypes.h>
#include "cursor.h"
// how many filters are allowed in a filter group
#define NDB_MAX_FILTERS 16
// maximum number of filters allowed in a filter group
#define NDB_PACKED_STR 0x1
#define NDB_PACKED_ID 0x2
@@ -26,6 +30,7 @@ struct ndb_blocks;
struct ndb_block;
struct ndb_note;
struct ndb_tag;
struct ndb_filter_group;
struct ndb_tags;
struct ndb_lmdb;
union ndb_packed_str;
@@ -236,6 +241,11 @@ struct ndb_filter {
struct ndb_filter_elements *elements[NDB_NUM_FILTERS];
};
struct ndb_filter_group {
struct ndb_filter *filters[NDB_MAX_FILTERS];
int num_filters;
};
struct ndb_config {
int flags;
int ingester_threads;
@@ -462,12 +472,21 @@ void ndb_filter_reset(struct ndb_filter *);
void ndb_filter_end_field(struct ndb_filter *);
void ndb_filter_free(struct ndb_filter *);
// SUBSCRIPTIONS
uint64_t ndb_subscribe(struct ndb *, struct ndb_filter_group *);
int ndb_wait_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids,
int note_id_capacity);
int ndb_unsubscribe(int subid);
// FULLTEXT SEARCH
int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *, struct ndb_text_search_config *);
void ndb_default_text_search_config(struct ndb_text_search_config *);
void ndb_text_search_config_set_order(struct ndb_text_search_config *, enum ndb_search_order);
void ndb_text_search_config_set_limit(struct ndb_text_search_config *, int limit);
// QUERY
void ndb_query(struct ndb_filter **, int num_filters);
// STATS
int ndb_stat(struct ndb *ndb, struct ndb_stat *stat);
void ndb_stat_counts_init(struct ndb_stat_counts *counts);