nostrdb: config: custom writer scratch size

making more things configurable if you have memory constraints

Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2025-03-19 12:57:53 -07:00
committed by Daniel D’Aquino
parent 3290e1f9d2
commit fcd8131063
2 changed files with 25 additions and 8 deletions

View File

@@ -43,6 +43,9 @@
// the maximum size of inbox queues // the maximum size of inbox queues
static const int DEFAULT_QUEUE_SIZE = 32768; static const int DEFAULT_QUEUE_SIZE = 32768;
// 2mb scratch size for the writer thread
static const int DEFAULT_WRITER_SCRATCH_SIZE = 2097152;
// increase if we need bigger filters // increase if we need bigger filters
#define NDB_FILTER_PAGES 64 #define NDB_FILTER_PAGES 64
@@ -163,6 +166,7 @@ struct ndb_writer {
struct ndb_lmdb *lmdb; struct ndb_lmdb *lmdb;
struct ndb_monitor *monitor; struct ndb_monitor *monitor;
int scratch_size;
uint32_t ndb_flags; uint32_t ndb_flags;
void *queue_buf; void *queue_buf;
int queue_buflen; int queue_buflen;
@@ -4558,15 +4562,13 @@ static void *ndb_writer_thread(void *data)
struct ndb_writer *writer = data; struct ndb_writer *writer = data;
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg; struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
struct written_note written_notes[THREAD_QUEUE_BATCH]; struct written_note written_notes[THREAD_QUEUE_BATCH];
size_t scratch_size;
int i, popped, done, needs_commit, num_notes; int i, popped, done, needs_commit, num_notes;
uint64_t note_nkey; uint64_t note_nkey;
struct ndb_txn txn; struct ndb_txn txn;
unsigned char *scratch; unsigned char *scratch;
// 8mb scratch buffer for parsing note content // 2MB scratch buffer for parsing note content
scratch_size = 8 * 1024 * 1024; scratch = malloc(writer->scratch_size);
scratch = malloc(scratch_size);
MDB_txn *mdb_txn = NULL; MDB_txn *mdb_txn = NULL;
ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn); ndb_txn_from_mdb(&txn, writer->lmdb, mdb_txn);
@@ -4615,7 +4617,7 @@ static void *ndb_writer_thread(void *data)
&txn, &txn,
&msg->profile, &msg->profile,
scratch, scratch,
scratch_size, writer->scratch_size,
writer->ndb_flags); writer->ndb_flags);
if (note_nkey > 0) { if (note_nkey > 0) {
@@ -4631,7 +4633,7 @@ static void *ndb_writer_thread(void *data)
case NDB_WRITER_NOTE: case NDB_WRITER_NOTE:
note_nkey = ndb_write_note(&txn, &msg->note, note_nkey = ndb_write_note(&txn, &msg->note,
scratch, scratch,
scratch_size, writer->scratch_size,
writer->ndb_flags); writer->ndb_flags);
if (note_nkey > 0) { if (note_nkey > 0) {
@@ -4769,11 +4771,13 @@ static void *ndb_ingester_thread(void *data)
static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb, static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb,
struct ndb_monitor *monitor, uint32_t ndb_flags) struct ndb_monitor *monitor, uint32_t ndb_flags,
int scratch_size)
{ {
writer->lmdb = lmdb; writer->lmdb = lmdb;
writer->monitor = monitor; writer->monitor = monitor;
writer->ndb_flags = ndb_flags; writer->ndb_flags = ndb_flags;
writer->scratch_size = scratch_size;
writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE; writer->queue_buflen = sizeof(struct ndb_writer_msg) * DEFAULT_QUEUE_SIZE;
writer->queue_buf = malloc(writer->queue_buflen); writer->queue_buf = malloc(writer->queue_buflen);
if (writer->queue_buf == NULL) { if (writer->queue_buf == NULL) {
@@ -5064,7 +5068,8 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c
ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx); ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx);
if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor, ndb->flags)) { if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor, ndb->flags,
config->writer_scratch_buffer_size)) {
fprintf(stderr, "ndb_writer_init failed\n"); fprintf(stderr, "ndb_writer_init failed\n");
return 0; return 0;
} }
@@ -6771,6 +6776,7 @@ void ndb_default_config(struct ndb_config *config)
config->filter_context = NULL; config->filter_context = NULL;
config->sub_cb_ctx = NULL; config->sub_cb_ctx = NULL;
config->sub_cb = NULL; config->sub_cb = NULL;
config->writer_scratch_buffer_size = DEFAULT_WRITER_SCRATCH_SIZE;
} }
void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context) void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context)
@@ -6779,6 +6785,11 @@ void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn
config->sub_cb = fn; config->sub_cb = fn;
} }
void ndb_config_set_writer_scratch_buffer_size(struct ndb_config *config, int scratch_size)
{
config->writer_scratch_buffer_size = scratch_size;
}
void ndb_config_set_ingest_threads(struct ndb_config *config, int threads) void ndb_config_set_ingest_threads(struct ndb_config *config, int threads)
{ {
config->ingester_threads = threads; config->ingester_threads = threads;

View File

@@ -273,6 +273,7 @@ struct ndb_filter {
struct ndb_config { struct ndb_config {
int flags; int flags;
int ingester_threads; int ingester_threads;
int writer_scratch_buffer_size;
size_t mapsize; size_t mapsize;
void *filter_context; void *filter_context;
ndb_ingest_filter_fn ingest_filter; ndb_ingest_filter_fn ingest_filter;
@@ -459,6 +460,11 @@ void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize);
void ndb_config_set_ingest_filter(struct ndb_config *config, ndb_ingest_filter_fn fn, void *); void ndb_config_set_ingest_filter(struct ndb_config *config, ndb_ingest_filter_fn fn, void *);
void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *ctx); void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *ctx);
/// Configurable scratch buffer size for the writer thread. Default is 2MB. If you have smaller notes
/// you can decrease this to reduce memory usage. If you have bigger notes you should increase this so
/// that the writer thread can properly parse larger notes.
void ndb_config_set_writer_scratch_buffer_size(struct ndb_config *config, int scratch_size);
// HELPERS // HELPERS
int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen); int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen);
int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32], unsigned char sig[64]); int ndb_sign_id(struct ndb_keypair *keypair, unsigned char id[32], unsigned char sig[64]);