nostrdb: add ability to register a subscription callback
Since Damus iOS is not an immediate-mode UI like android, we would rather not poll for results. Instead we need a way to register a callback function that is called when we get new subscription results. This is also useful on the android side, allowing us to request a new frame to draw when we have new results, instead of drawing every second. Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
committed by
Daniel D’Aquino
parent
30c9bc7db7
commit
a562be009d
@@ -191,6 +191,8 @@ struct ndb_subscription {
|
|||||||
|
|
||||||
struct ndb_monitor {
|
struct ndb_monitor {
|
||||||
struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS];
|
struct ndb_subscription subscriptions[MAX_SUBSCRIPTIONS];
|
||||||
|
ndb_sub_fn sub_cb;
|
||||||
|
void *sub_cb_ctx;
|
||||||
int num_subscriptions;
|
int num_subscriptions;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -3738,6 +3740,7 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
|
|||||||
struct written_note *wrote, int num_notes)
|
struct written_note *wrote, int num_notes)
|
||||||
{
|
{
|
||||||
int i, k;
|
int i, k;
|
||||||
|
int pushed;
|
||||||
struct written_note *written;
|
struct written_note *written;
|
||||||
struct ndb_note *note;
|
struct ndb_note *note;
|
||||||
struct ndb_subscription *sub;
|
struct ndb_subscription *sub;
|
||||||
@@ -3746,20 +3749,31 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
|
|||||||
sub = &monitor->subscriptions[i];
|
sub = &monitor->subscriptions[i];
|
||||||
ndb_debug("checking subscription %d, %d notes\n", i, num_notes);
|
ndb_debug("checking subscription %d, %d notes\n", i, num_notes);
|
||||||
|
|
||||||
|
pushed = 0;
|
||||||
for (k = 0; k < num_notes; k++) {
|
for (k = 0; k < num_notes; k++) {
|
||||||
written = &wrote[k];
|
written = &wrote[k];
|
||||||
note = written->note->note;
|
note = written->note->note;
|
||||||
|
|
||||||
if (ndb_filter_group_matches(&sub->group, note)) {
|
if (ndb_filter_group_matches(&sub->group, note)) {
|
||||||
ndb_debug("pushing note\n");
|
ndb_debug("pushing note\n");
|
||||||
|
|
||||||
if (!prot_queue_push(&sub->inbox, &written->note_id)) {
|
if (!prot_queue_push(&sub->inbox, &written->note_id)) {
|
||||||
ndb_debug("couldn't push note to subscriber");
|
ndb_debug("couldn't push note to subscriber");
|
||||||
|
} else {
|
||||||
|
pushed++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ndb_debug("not pushing note\n");
|
ndb_debug("not pushing note\n");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After pushing all of the matching notes, check to see if we
|
||||||
|
// have a registered subscription callback. If so, we call it.
|
||||||
|
// The callback needs to call ndb_poll_for_notes to pull data
|
||||||
|
// that was just pushed to the queue in the for loop above.
|
||||||
|
if (monitor->sub_cb != NULL && pushed > 0) {
|
||||||
|
monitor->sub_cb(monitor->sub_cb_ctx, sub->subid);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -4234,9 +4248,12 @@ static int ndb_run_migrations(struct ndb *ndb)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ndb_monitor_init(struct ndb_monitor *monitor)
|
static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb,
|
||||||
|
void *sub_cb_ctx)
|
||||||
{
|
{
|
||||||
memset(monitor, 0, sizeof(*monitor));
|
monitor->num_subscriptions = 0;
|
||||||
|
monitor->sub_cb = cb;
|
||||||
|
monitor->sub_cb_ctx = sub_cb_ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ndb_filter_group_destroy(struct ndb_filter_group *group)
|
void ndb_filter_group_destroy(struct ndb_filter_group *group)
|
||||||
@@ -4281,7 +4298,7 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c
|
|||||||
if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize))
|
if (!ndb_init_lmdb(filename, &ndb->lmdb, config->mapsize))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
ndb_monitor_init(&ndb->monitor);
|
ndb_monitor_init(&ndb->monitor, config->sub_cb, config->sub_cb_ctx);
|
||||||
|
|
||||||
if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor)) {
|
if (!ndb_writer_init(&ndb->writer, &ndb->lmdb, &ndb->monitor)) {
|
||||||
fprintf(stderr, "ndb_writer_init failed\n");
|
fprintf(stderr, "ndb_writer_init failed\n");
|
||||||
@@ -5496,6 +5513,14 @@ void ndb_default_config(struct ndb_config *config)
|
|||||||
config->flags = 0;
|
config->flags = 0;
|
||||||
config->ingest_filter = NULL;
|
config->ingest_filter = NULL;
|
||||||
config->filter_context = NULL;
|
config->filter_context = NULL;
|
||||||
|
config->sub_cb_ctx = NULL;
|
||||||
|
config->sub_cb = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *context)
|
||||||
|
{
|
||||||
|
config->sub_cb_ctx = context;
|
||||||
|
config->sub_cb = fn;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ndb_config_set_ingest_threads(struct ndb_config *config, int threads)
|
void ndb_config_set_ingest_threads(struct ndb_config *config, int threads)
|
||||||
|
|||||||
@@ -64,6 +64,9 @@ struct ndb_keypair {
|
|||||||
// function pointer for controlling what to do after we parse an id
|
// function pointer for controlling what to do after we parse an id
|
||||||
typedef enum ndb_idres (*ndb_id_fn)(void *, const char *);
|
typedef enum ndb_idres (*ndb_id_fn)(void *, const char *);
|
||||||
|
|
||||||
|
// callback function for when we receive new subscription results
|
||||||
|
typedef void (*ndb_sub_fn)(void *, uint64_t subid);
|
||||||
|
|
||||||
// id callback + closure data
|
// id callback + closure data
|
||||||
struct ndb_id_cb {
|
struct ndb_id_cb {
|
||||||
ndb_id_fn fn;
|
ndb_id_fn fn;
|
||||||
@@ -256,6 +259,8 @@ struct ndb_config {
|
|||||||
size_t mapsize;
|
size_t mapsize;
|
||||||
void *filter_context;
|
void *filter_context;
|
||||||
ndb_ingest_filter_fn ingest_filter;
|
ndb_ingest_filter_fn ingest_filter;
|
||||||
|
void *sub_cb_ctx;
|
||||||
|
ndb_sub_fn sub_cb;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ndb_text_search_config {
|
struct ndb_text_search_config {
|
||||||
@@ -430,6 +435,7 @@ void ndb_config_set_ingest_threads(struct ndb_config *config, int threads);
|
|||||||
void ndb_config_set_flags(struct ndb_config *config, int flags);
|
void ndb_config_set_flags(struct ndb_config *config, int flags);
|
||||||
void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize);
|
void ndb_config_set_mapsize(struct ndb_config *config, size_t mapsize);
|
||||||
void ndb_config_set_ingest_filter(struct ndb_config *config, ndb_ingest_filter_fn fn, void *);
|
void ndb_config_set_ingest_filter(struct ndb_config *config, ndb_ingest_filter_fn fn, void *);
|
||||||
|
void ndb_config_set_subscription_callback(struct ndb_config *config, ndb_sub_fn fn, void *ctx);
|
||||||
|
|
||||||
// HELPERS
|
// HELPERS
|
||||||
int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen);
|
int ndb_calculate_id(struct ndb_note *note, unsigned char *buf, int buflen);
|
||||||
|
|||||||
Reference in New Issue
Block a user