nostrdb/ingest: add configurable ingest filter
This allows users of nostrdb to selectively filter notes of any kind during ingest. Contact lists too big? Create a filter to reject them. You only care about notes with specific kinds? Reject everything else. Damus will use this for rejecting large events that might take up too much space for storage, such as contact lists. This commit also switched to ndb_config for configuring nostrdb, because the arguments to ndb_init were getting out of hand. Changelog-Added: Added ingest filter setting Changelog-Changed: Switch to ndb_config for per-session ndb settings Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
@@ -7,6 +7,7 @@
|
||||
#include "sha256.h"
|
||||
#include "lmdb.h"
|
||||
#include "util.h"
|
||||
#include "cpu.h"
|
||||
#include "threadpool.h"
|
||||
#include "protected_queue.h"
|
||||
#include "memchr.h"
|
||||
@@ -115,6 +116,8 @@ struct ndb_ingester {
|
||||
uint32_t flags;
|
||||
struct threadpool tp;
|
||||
struct ndb_writer *writer;
|
||||
void *filter_context;
|
||||
ndb_ingest_filter_fn filter;
|
||||
};
|
||||
|
||||
|
||||
@@ -1577,16 +1580,23 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
|
||||
struct ndb_note *note,
|
||||
size_t note_size,
|
||||
struct ndb_writer_msg *out,
|
||||
uint32_t flags)
|
||||
struct ndb_ingester *ingester)
|
||||
{
|
||||
//printf("ndb_ingester_process_note ");
|
||||
//print_hex(note->id, 32);
|
||||
//printf("\n");
|
||||
enum ndb_ingest_filter_action action;
|
||||
action = NDB_INGEST_ACCEPT;
|
||||
|
||||
if (ingester->filter)
|
||||
action = ingester->filter(ingester->filter_context, note);
|
||||
|
||||
if (action == NDB_INGEST_REJECT)
|
||||
return 0;
|
||||
|
||||
// some special situations we might want to skip sig validation,
|
||||
// like during large imports
|
||||
if (!(flags & NDB_FLAG_SKIP_NOTE_VERIFY)) {
|
||||
// Verify! If it's an invalid note we don't need to
|
||||
if (action == NDB_INGEST_SKIP_VALIDATION || (ingester->flags & NDB_FLAG_SKIP_NOTE_VERIFY)) {
|
||||
// if we're skipping validation we don't need to verify
|
||||
} else {
|
||||
// verify! If it's an invalid note we don't need to
|
||||
// bother writing it to the database
|
||||
if (!ndb_note_verify(ctx, note->pubkey, note->id, note->sig)) {
|
||||
ndb_debug("signature verification failed\n");
|
||||
@@ -1678,7 +1688,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
|
||||
}
|
||||
|
||||
if (!ndb_ingester_process_note(ctx, note, note_size,
|
||||
out, ingester->flags)) {
|
||||
out, ingester)) {
|
||||
goto cleanup;
|
||||
} else {
|
||||
// we're done with the original json, free it
|
||||
@@ -1699,7 +1709,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
|
||||
}
|
||||
|
||||
if (!ndb_ingester_process_note(ctx, note, note_size,
|
||||
out, ingester->flags)) {
|
||||
out, ingester)) {
|
||||
goto cleanup;
|
||||
} else {
|
||||
// we're done with the original json, free it
|
||||
@@ -3000,8 +3010,8 @@ static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb)
|
||||
|
||||
// initialize the ingester queue and then spawn the thread
|
||||
static int ndb_ingester_init(struct ndb_ingester *ingester,
|
||||
struct ndb_writer *writer, int num_threads,
|
||||
int flags)
|
||||
struct ndb_writer *writer,
|
||||
struct ndb_config *config)
|
||||
{
|
||||
int elem_size, num_elems;
|
||||
static struct ndb_ingester_msg quit_msg = { .type = NDB_INGEST_QUIT };
|
||||
@@ -3011,10 +3021,13 @@ static int ndb_ingester_init(struct ndb_ingester *ingester,
|
||||
num_elems = DEFAULT_QUEUE_SIZE;
|
||||
|
||||
ingester->writer = writer;
|
||||
ingester->flags = flags;
|
||||
ingester->flags = config->flags;
|
||||
ingester->filter = config->ingest_filter;
|
||||
ingester->filter_context = config->filter_context;
|
||||
|
||||
if (!threadpool_init(&ingester->tp, num_threads, elem_size, num_elems,
|
||||
&quit_msg, ingester, ndb_ingester_thread))
|
||||
if (!threadpool_init(&ingester->tp, config->ingester_threads,
|
||||
elem_size, num_elems, &quit_msg, ingester,
|
||||
ndb_ingester_thread))
|
||||
{
|
||||
fprintf(stderr, "ndb ingester threadpool failed to init\n");
|
||||
return 0;
|
||||
@@ -3221,20 +3234,20 @@ static int ndb_run_migrations(struct ndb *ndb)
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingester_threads, int flags)
|
||||
int ndb_init(struct ndb **pndb, const char *filename, struct ndb_config *config)
|
||||
{
|
||||
struct ndb *ndb;
|
||||
//MDB_dbi ind_id; // TODO: ind_pk, etc
|
||||
|
||||
ndb = *pndb = calloc(1, sizeof(struct ndb));
|
||||
ndb->flags = flags;
|
||||
ndb->flags = config->flags;
|
||||
|
||||
if (ndb == NULL) {
|
||||
fprintf(stderr, "ndb_init: malloc failed\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!ndb_init_lmdb(filename, &ndb->lmdb, mapsize))
|
||||
if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize))
|
||||
return 0;
|
||||
|
||||
if (!ndb_writer_init(&ndb->writer, &ndb->lmdb)) {
|
||||
@@ -3242,14 +3255,14 @@ int ndb_init(struct ndb **pndb, const char *filename, size_t mapsize, int ingest
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, ingester_threads,
|
||||
ndb->flags)) {
|
||||
if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) {
|
||||
fprintf(stderr, "failed to initialize %d ingester thread(s)\n",
|
||||
ingester_threads);
|
||||
config->ingester_threads);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!ndb_flag_set(flags, NDB_FLAG_NOMIGRATE) && !ndb_run_migrations(ndb)) {
|
||||
if (!ndb_flag_set(config->flags, NDB_FLAG_NOMIGRATE) &&
|
||||
!ndb_run_migrations(ndb)) {
|
||||
fprintf(stderr, "failed to run migrations\n");
|
||||
return 0;
|
||||
}
|
||||
@@ -4380,4 +4393,37 @@ inline int ndb_builder_push_tag_str(struct ndb_builder *builder,
|
||||
return ndb_builder_finalize_tag(builder, pstr);
|
||||
}
|
||||
|
||||
//
|
||||
// CONFIG
|
||||
//
|
||||
void ndb_default_config(struct ndb_config *config)
|
||||
{
|
||||
config->mapsize = 1024UL * 1024UL * 1024UL * 32UL; // 32 GiB
|
||||
config->ingester_threads = 4; // TODO: figure this out from platform apis
|
||||
config->flags = 0;
|
||||
config->ingest_filter = NULL;
|
||||
config->filter_context = NULL;
|
||||
}
|
||||
|
||||
void ndb_config_set_ingest_threads(struct ndb_config *config, int threads)
|
||||
{
|
||||
int cores = get_physical_cores();
|
||||
config->ingester_threads = cores == -1 ? 4 : cores;
|
||||
}
|
||||
|
||||
void ndb_config_set_flags(struct ndb_config *config, int flags)
|
||||
{
|
||||
config->flags = flags;
|
||||
}
|
||||
|
||||
void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize)
|
||||
{
|
||||
config->mapsize = mapsize;
|
||||
}
|
||||
|
||||
void ndb_config_set_ingest_filter(struct ndb_config *config,
|
||||
ndb_ingest_filter_fn fn, void *filter_ctx)
|
||||
{
|
||||
config->ingest_filter = fn;
|
||||
config->filter_context = filter_ctx;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user