nostrdb: make the subscription monitor threadsafe

This was the only thing that wasn't threadsafe. Add a simple mutex
instead of a queue so that polling is quick.

This also means we can't really return the internal subscriptions
anymore, so we remove that for now until we have a safer
interface.

Fixes: https://github.com/damus-io/nostrdb/issues/55
Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2024-12-09 14:48:31 -08:00
committed by Daniel D’Aquino
parent 6818d001f2
commit 0916b14b32
2 changed files with 88 additions and 49 deletions

View File

@@ -170,9 +170,10 @@ struct ndb_writer {
}; };
struct ndb_ingester { struct ndb_ingester {
struct ndb_lmdb *lmdb;
uint32_t flags; uint32_t flags;
struct threadpool tp; struct threadpool tp;
struct ndb_writer *writer; struct prot_queue *writer_inbox;
void *filter_context; void *filter_context;
ndb_ingest_filter_fn filter; ndb_ingest_filter_fn filter;
}; };
@@ -193,6 +194,11 @@ struct ndb_monitor {
ndb_sub_fn sub_cb; ndb_sub_fn sub_cb;
void *sub_cb_ctx; void *sub_cb_ctx;
int num_subscriptions; int num_subscriptions;
// monitor isn't a full inbox. We want pollers to be able to poll
// subscriptions efficiently without going through a message queue, so
// we use a simple mutex here.
pthread_mutex_t mutex;
}; };
struct ndb { struct ndb {
@@ -1721,13 +1727,6 @@ int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
return 1; return 1;
} }
static inline int ndb_writer_queue_msgs(struct ndb_writer *writer,
struct ndb_writer_msg *msgs,
int num_msgs)
{
return prot_queue_push_all(&writer->inbox, msgs, num_msgs);
}
static int ndb_writer_queue_note(struct ndb_writer *writer, static int ndb_writer_queue_note(struct ndb_writer *writer,
struct ndb_note *note, size_t note_len) struct ndb_note *note, size_t note_len)
{ {
@@ -2200,7 +2199,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
// we will use this to check if we already have it in the DB during // we will use this to check if we already have it in the DB during
// ID parsing // ID parsing
controller.read_txn = read_txn; controller.read_txn = read_txn;
controller.lmdb = ingester->writer->lmdb; controller.lmdb = ingester->lmdb;
cb.fn = ndb_ingester_json_controller; cb.fn = ndb_ingester_json_controller;
cb.data = &controller; cb.data = &controller;
@@ -3928,6 +3927,14 @@ static void ndb_write_version(struct ndb_txn *txn, uint64_t version)
//fprintf(stderr, "writing version %" PRIu64 "\n", version); //fprintf(stderr, "writing version %" PRIu64 "\n", version);
} }
static void ndb_monitor_lock(struct ndb_monitor *mon) {
pthread_mutex_lock(&mon->mutex);
}
static void ndb_monitor_unlock(struct ndb_monitor *mon) {
pthread_mutex_unlock(&mon->mutex);
}
struct written_note { struct written_note {
uint64_t note_id; uint64_t note_id;
struct ndb_writer_note *note; struct ndb_writer_note *note;
@@ -3945,6 +3952,8 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
struct ndb_note *note; struct ndb_note *note;
struct ndb_subscription *sub; struct ndb_subscription *sub;
ndb_monitor_lock(monitor);
for (i = 0; i < monitor->num_subscriptions; i++) { for (i = 0; i < monitor->num_subscriptions; i++) {
sub = &monitor->subscriptions[i]; sub = &monitor->subscriptions[i];
ndb_debug("checking subscription %d, %d notes\n", i, num_notes); ndb_debug("checking subscription %d, %d notes\n", i, num_notes);
@@ -3975,6 +3984,8 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
monitor->sub_cb(monitor->sub_cb_ctx, sub->subid); monitor->sub_cb(monitor->sub_cb_ctx, sub->subid);
} }
} }
ndb_monitor_unlock(monitor);
} }
static void *ndb_writer_thread(void *data) static void *ndb_writer_thread(void *data)
@@ -4117,7 +4128,7 @@ static void *ndb_ingester_thread(void *data)
secp256k1_context *ctx; secp256k1_context *ctx;
struct thread *thread = data; struct thread *thread = data;
struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx; struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx;
struct ndb_lmdb *lmdb = ingester->writer->lmdb; struct ndb_lmdb *lmdb = ingester->lmdb;
struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg; struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg;
struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out; struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out;
int i, to_write, popped, done, any_event; int i, to_write, popped, done, any_event;
@@ -4172,7 +4183,7 @@ static void *ndb_ingester_thread(void *data)
if (to_write > 0) { if (to_write > 0) {
ndb_debug("pushing %d events to write queue\n", to_write); ndb_debug("pushing %d events to write queue\n", to_write);
if (!ndb_writer_queue_msgs(ingester->writer, outs, to_write)) { if (!prot_queue_push_all(ingester->writer_inbox, outs, to_write)) {
ndb_debug("failed pushing %d events to write queue\n", to_write); ndb_debug("failed pushing %d events to write queue\n", to_write);
} }
} }
@@ -4212,7 +4223,8 @@ static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb,
// initialize the ingester queue and then spawn the thread // initialize the ingester queue and then spawn the thread
static int ndb_ingester_init(struct ndb_ingester *ingester, static int ndb_ingester_init(struct ndb_ingester *ingester,
struct ndb_writer *writer, struct ndb_lmdb *lmdb,
struct prot_queue *writer_inbox,
const struct ndb_config *config) const struct ndb_config *config)
{ {
int elem_size, num_elems; int elem_size, num_elems;
@@ -4222,7 +4234,8 @@ static int ndb_ingester_init(struct ndb_ingester *ingester,
elem_size = sizeof(struct ndb_ingester_msg); elem_size = sizeof(struct ndb_ingester_msg);
num_elems = DEFAULT_QUEUE_SIZE; num_elems = DEFAULT_QUEUE_SIZE;
ingester->writer = writer; ingester->writer_inbox = writer_inbox;
ingester->lmdb = lmdb;
ingester->flags = config->flags; ingester->flags = config->flags;
ingester->filter = config->ingest_filter; ingester->filter = config->ingest_filter;
ingester->filter_context = config->filter_context; ingester->filter_context = config->filter_context;
@@ -4448,6 +4461,7 @@ static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb,
monitor->num_subscriptions = 0; monitor->num_subscriptions = 0;
monitor->sub_cb = cb; monitor->sub_cb = cb;
monitor->sub_cb_ctx = sub_cb_ctx; monitor->sub_cb_ctx = sub_cb_ctx;
pthread_mutex_init(&monitor->mutex, NULL);
} }
void ndb_filter_group_destroy(struct ndb_filter_group *group) void ndb_filter_group_destroy(struct ndb_filter_group *group)
@@ -4499,7 +4513,7 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c
return 0; return 0;
} }
if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) { if (!ndb_ingester_init(&ndb->ingester, &ndb->lmdb, &ndb->writer.inbox, config)) {
fprintf(stderr, "failed to initialize %d ingester thread(s)\n", fprintf(stderr, "failed to initialize %d ingester thread(s)\n",
config->ingester_threads); config->ingester_threads);
return 0; return 0;
@@ -6544,13 +6558,15 @@ struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, u
return blocks; return blocks;
} }
struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid, int *index) // please call ndb_monitor_lock before calling this
static struct ndb_subscription *
ndb_monitor_find_subscription(struct ndb_monitor *monitor, uint64_t subid, int *index)
{ {
struct ndb_subscription *sub, *tsub; struct ndb_subscription *sub, *tsub;
int i; int i;
for (i = 0, sub = NULL; i < ndb->monitor.num_subscriptions; i++) { for (i = 0, sub = NULL; i < monitor->num_subscriptions; i++) {
tsub = &ndb->monitor.subscriptions[i]; tsub = &monitor->subscriptions[i];
if (tsub->subid == subid) { if (tsub->subid == subid) {
sub = tsub; sub = tsub;
if (index) if (index)
@@ -6566,38 +6582,63 @@ int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
int note_id_capacity) int note_id_capacity)
{ {
struct ndb_subscription *sub; struct ndb_subscription *sub;
int res;
if (subid == 0) if (subid == 0)
return 0; return 0;
if (!(sub = ndb_find_subscription(ndb, subid, NULL))) ndb_monitor_lock(&ndb->monitor);
return 0;
return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity); if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL)))
res = 0;
else
res = prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);
ndb_monitor_unlock(&ndb->monitor);
return res;
} }
int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
int note_id_capacity) int note_id_capacity)
{ {
struct ndb_subscription *sub; struct ndb_subscription *sub;
struct prot_queue *queue_inbox;
// this is not a valid subscription id // this is not a valid subscription id
if (subid == 0) if (subid == 0)
return 0; return 0;
if (!(sub = ndb_find_subscription(ndb, subid, NULL))) ndb_monitor_lock(&ndb->monitor);
return 0;
return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity); if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) {
ndb_monitor_unlock(&ndb->monitor);
return 0;
}
queue_inbox = &sub->inbox;
ndb_monitor_unlock(&ndb->monitor);
// there is technically a race condition if the thread yeilds at this
// comment and a subscription is added/removed. A deadlock in the
// writer queue would be much worse though. This function is dubious
// anyways.
return prot_queue_pop_all(queue_inbox, note_ids, note_id_capacity);
} }
int ndb_unsubscribe(struct ndb *ndb, uint64_t subid) int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
{ {
struct ndb_subscription *sub; struct ndb_subscription *sub;
int index, elems_to_move; int index, res, elems_to_move;
if (!(sub = ndb_find_subscription(ndb, subid, &index))) ndb_monitor_lock(&ndb->monitor);
return 0;
if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, &index))) {
res = 0;
goto done;
}
ndb_subscription_destroy(sub); ndb_subscription_destroy(sub);
@@ -6607,21 +6648,12 @@ int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
&ndb->monitor.subscriptions[index+1], &ndb->monitor.subscriptions[index+1],
elems_to_move * sizeof(*sub)); elems_to_move * sizeof(*sub));
return 1; res = 1;
}
struct ndb_filter *ndb_subscription_filters(struct ndb *ndb, uint64_t subid, int *filters) done:
{ ndb_monitor_unlock(&ndb->monitor);
struct ndb_subscription *sub;
sub = ndb_find_subscription(ndb, subid, NULL); return res;
if (sub) {
*filters = sub->group.num_filters;
return sub->group.filters;
}
*filters = 0;
return NULL;
} }
int ndb_num_subscriptions(struct ndb *ndb) int ndb_num_subscriptions(struct ndb *ndb)
@@ -6633,24 +6665,27 @@ uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filt
{ {
static uint64_t subids = 0; static uint64_t subids = 0;
struct ndb_subscription *sub; struct ndb_subscription *sub;
int index;
size_t buflen; size_t buflen;
uint64_t subid; uint64_t subid;
char *buf; char *buf;
ndb_monitor_lock(&ndb->monitor);
if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) { if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) {
fprintf(stderr, "too many subscriptions\n"); fprintf(stderr, "too many subscriptions\n");
return 0; subid = 0;
goto done;
} }
index = ndb->monitor.num_subscriptions++; sub = &ndb->monitor.subscriptions[ndb->monitor.num_subscriptions];
sub = &ndb->monitor.subscriptions[index];
subid = ++subids; subid = ++subids;
sub->subid = subid; sub->subid = subid;
ndb_filter_group_init(&sub->group); ndb_filter_group_init(&sub->group);
if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) {
return 0; subid = 0;
goto done;
}
// 500k ought to be enough for anyone // 500k ought to be enough for anyone
buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE; buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE;
@@ -6658,8 +6693,13 @@ uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filt
if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) { if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) {
fprintf(stderr, "failed to push prot queue\n"); fprintf(stderr, "failed to push prot queue\n");
return 0; subid = 0;
goto done;
} }
ndb->monitor.num_subscriptions++;
done:
ndb_monitor_unlock(&ndb->monitor);
return subid; return subid;
} }

View File

@@ -532,7 +532,6 @@ int ndb_wait_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int not
int ndb_poll_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity); int ndb_poll_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity);
int ndb_unsubscribe(struct ndb *, uint64_t subid); int ndb_unsubscribe(struct ndb *, uint64_t subid);
int ndb_num_subscriptions(struct ndb *); int ndb_num_subscriptions(struct ndb *);
struct ndb_filter *ndb_subscription_filters(struct ndb *, uint64_t subid, int *filters);
// FULLTEXT SEARCH // FULLTEXT SEARCH
int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *, struct ndb_text_search_config *); int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *, struct ndb_text_search_config *);