nostrdb/subs: subs and monitor cleanup
We need to free these resources when we're done with them. Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
committed by
Daniel D’Aquino
parent
f03d8a5ac9
commit
81d65cd5bf
@@ -177,7 +177,7 @@ struct ndb_ingester {
|
|||||||
|
|
||||||
struct ndb_subscription {
|
struct ndb_subscription {
|
||||||
uint64_t subid;
|
uint64_t subid;
|
||||||
struct ndb_filter_group filter;
|
struct ndb_filter_group group;
|
||||||
struct prot_queue inbox;
|
struct prot_queue inbox;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -520,7 +520,7 @@ void ndb_filter_reset(struct ndb_filter *filter)
|
|||||||
filter->current = NULL;
|
filter->current = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ndb_filter_free(struct ndb_filter *filter)
|
void ndb_filter_destroy(struct ndb_filter *filter)
|
||||||
{
|
{
|
||||||
if (filter->elem_buf.start)
|
if (filter->elem_buf.start)
|
||||||
free(filter->elem_buf.start);
|
free(filter->elem_buf.start);
|
||||||
@@ -2889,7 +2889,7 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
|
|||||||
written = &wrote[k];
|
written = &wrote[k];
|
||||||
note = written->note->note;
|
note = written->note->note;
|
||||||
|
|
||||||
if (ndb_filter_group_matches(&sub->filter, note)) {
|
if (ndb_filter_group_matches(&sub->group, note)) {
|
||||||
ndb_debug("pushing note\n");
|
ndb_debug("pushing note\n");
|
||||||
if (!prot_queue_push(&sub->inbox, &written->note_id)) {
|
if (!prot_queue_push(&sub->inbox, &written->note_id)) {
|
||||||
ndb_debug("couldn't push note to subscriber");
|
ndb_debug("couldn't push note to subscriber");
|
||||||
@@ -3361,6 +3361,31 @@ static void ndb_monitor_init(struct ndb_monitor *monitor)
|
|||||||
monitor->num_subscriptions = 0;
|
monitor->num_subscriptions = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ndb_filter_group_destroy(struct ndb_filter_group *group)
|
||||||
|
{
|
||||||
|
struct ndb_filter *filter;
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < group->num_filters; i++) {
|
||||||
|
filter = group->filters[i];
|
||||||
|
ndb_filter_destroy(filter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void ndb_monitor_destroy(struct ndb_monitor *monitor)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
struct ndb_subscription *sub;
|
||||||
|
struct ndb_filter_group *group;
|
||||||
|
|
||||||
|
for (i = 0; i < monitor->num_subscriptions; i++) {
|
||||||
|
sub = &monitor->subscriptions[i];
|
||||||
|
group = &sub->group;
|
||||||
|
|
||||||
|
ndb_filter_group_destroy(group);
|
||||||
|
prot_queue_destroy(&sub->inbox);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config)
|
int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *config)
|
||||||
{
|
{
|
||||||
struct ndb *ndb;
|
struct ndb *ndb;
|
||||||
@@ -3408,6 +3433,7 @@ void ndb_destroy(struct ndb *ndb)
|
|||||||
// ingester depends on writer and must be destroyed first
|
// ingester depends on writer and must be destroyed first
|
||||||
ndb_ingester_destroy(&ndb->ingester);
|
ndb_ingester_destroy(&ndb->ingester);
|
||||||
ndb_writer_destroy(&ndb->writer);
|
ndb_writer_destroy(&ndb->writer);
|
||||||
|
ndb_monitor_destroy(&ndb->monitor);
|
||||||
|
|
||||||
mdb_env_close(ndb->lmdb.env);
|
mdb_env_close(ndb->lmdb.env);
|
||||||
|
|
||||||
@@ -4943,7 +4969,7 @@ uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter_group *group)
|
|||||||
subid = ++subids;
|
subid = ++subids;
|
||||||
sub->subid = subid;
|
sub->subid = subid;
|
||||||
|
|
||||||
memcpy(&sub->filter, group, sizeof(*group));
|
memcpy(&sub->group, group, sizeof(*group));
|
||||||
|
|
||||||
// 500k ought to be enough for anyone
|
// 500k ought to be enough for anyone
|
||||||
buflen = sizeof(uint64_t) * 65536;
|
buflen = sizeof(uint64_t) * 65536;
|
||||||
|
|||||||
@@ -471,7 +471,7 @@ int ndb_filter_matches(struct ndb_filter *, struct ndb_note *);
|
|||||||
void ndb_filter_reset(struct ndb_filter *);
|
void ndb_filter_reset(struct ndb_filter *);
|
||||||
void ndb_filter_end_field(struct ndb_filter *);
|
void ndb_filter_end_field(struct ndb_filter *);
|
||||||
int ndb_filter_group_add(struct ndb_filter_group *group, struct ndb_filter *f);
|
int ndb_filter_group_add(struct ndb_filter_group *group, struct ndb_filter *f);
|
||||||
void ndb_filter_free(struct ndb_filter *);
|
void ndb_filter_destroy(struct ndb_filter *);
|
||||||
|
|
||||||
// SUBSCRIPTIONS
|
// SUBSCRIPTIONS
|
||||||
uint64_t ndb_subscribe(struct ndb *, struct ndb_filter_group *);
|
uint64_t ndb_subscribe(struct ndb *, struct ndb_filter_group *);
|
||||||
|
|||||||
Reference in New Issue
Block a user