nostrdb: add ndb_unsubscribe
We didn't have a way to unsubscribe from subscriptions. Now we do! Apps like notecrumbs may open up many local subscriptions based on incoming requests. We may need to make the MAX_SUBSCRIPTIONS size much larger, but this should be okish for now. Changelog-Added: Add ndb_unsubscribe to unsubscribe from subscriptions Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
committed by
Daniel D’Aquino
parent
0ac03df841
commit
30c9bc7db7
@@ -35,7 +35,7 @@
|
|||||||
static const int THREAD_QUEUE_BATCH = 4096;
|
static const int THREAD_QUEUE_BATCH = 4096;
|
||||||
|
|
||||||
// maximum number of active subscriptions
|
// maximum number of active subscriptions
|
||||||
#define MAX_SUBSCRIPTIONS 32
|
#define MAX_SUBSCRIPTIONS 256
|
||||||
#define MAX_SCAN_CURSORS 12
|
#define MAX_SCAN_CURSORS 12
|
||||||
#define MAX_FILTERS 16
|
#define MAX_FILTERS 16
|
||||||
|
|
||||||
@@ -4236,7 +4236,7 @@ static int ndb_run_migrations(struct ndb *ndb)
|
|||||||
|
|
||||||
static void ndb_monitor_init(struct ndb_monitor *monitor)
|
static void ndb_monitor_init(struct ndb_monitor *monitor)
|
||||||
{
|
{
|
||||||
monitor->num_subscriptions = 0;
|
memset(monitor, 0, sizeof(*monitor));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ndb_filter_group_destroy(struct ndb_filter_group *group)
|
void ndb_filter_group_destroy(struct ndb_filter_group *group)
|
||||||
@@ -4249,18 +4249,19 @@ void ndb_filter_group_destroy(struct ndb_filter_group *group)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ndb_subscription_destroy(struct ndb_subscription *sub)
|
||||||
|
{
|
||||||
|
ndb_filter_group_destroy(&sub->group);
|
||||||
|
prot_queue_destroy(&sub->inbox);
|
||||||
|
sub->subid = 0;
|
||||||
|
}
|
||||||
|
|
||||||
static void ndb_monitor_destroy(struct ndb_monitor *monitor)
|
static void ndb_monitor_destroy(struct ndb_monitor *monitor)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
struct ndb_subscription *sub;
|
|
||||||
struct ndb_filter_group *group;
|
|
||||||
|
|
||||||
for (i = 0; i < monitor->num_subscriptions; i++) {
|
for (i = 0; i < monitor->num_subscriptions; i++) {
|
||||||
sub = &monitor->subscriptions[i];
|
ndb_subscription_destroy(&monitor->subscriptions[i]);
|
||||||
group = &sub->group;
|
|
||||||
|
|
||||||
ndb_filter_group_destroy(group);
|
|
||||||
prot_queue_destroy(&sub->inbox);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5867,7 +5868,7 @@ struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, u
|
|||||||
return blocks;
|
return blocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid)
|
struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid, int *index)
|
||||||
{
|
{
|
||||||
struct ndb_subscription *sub, *tsub;
|
struct ndb_subscription *sub, *tsub;
|
||||||
int i;
|
int i;
|
||||||
@@ -5876,6 +5877,8 @@ struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid)
|
|||||||
tsub = &ndb->monitor.subscriptions[i];
|
tsub = &ndb->monitor.subscriptions[i];
|
||||||
if (tsub->subid == subid) {
|
if (tsub->subid == subid) {
|
||||||
sub = tsub;
|
sub = tsub;
|
||||||
|
if (index)
|
||||||
|
*index = i;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -5891,7 +5894,7 @@ int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
|
|||||||
if (subid == 0)
|
if (subid == 0)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
if (!(sub = ndb_find_subscription(ndb, subid)))
|
if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);
|
return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);
|
||||||
@@ -5906,12 +5909,36 @@ int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
|
|||||||
if (subid == 0)
|
if (subid == 0)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
if (!(sub = ndb_find_subscription(ndb, subid)))
|
if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity);
|
return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
|
||||||
|
{
|
||||||
|
struct ndb_subscription *sub;
|
||||||
|
int index, elems_to_move;
|
||||||
|
|
||||||
|
if (!(sub = ndb_find_subscription(ndb, subid, &index)))
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
ndb_subscription_destroy(sub);
|
||||||
|
|
||||||
|
elems_to_move = (--ndb->monitor.num_subscriptions) - index;
|
||||||
|
|
||||||
|
memmove(&ndb->monitor.subscriptions[index],
|
||||||
|
&ndb->monitor.subscriptions[index+1],
|
||||||
|
elems_to_move * sizeof(*sub));
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ndb_num_subscriptions(struct ndb *ndb)
|
||||||
|
{
|
||||||
|
return ndb->monitor.num_subscriptions;
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filters)
|
uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filters)
|
||||||
{
|
{
|
||||||
static uint64_t subids = 0;
|
static uint64_t subids = 0;
|
||||||
|
|||||||
@@ -497,7 +497,8 @@ void ndb_filter_destroy(struct ndb_filter *);
|
|||||||
uint64_t ndb_subscribe(struct ndb *, struct ndb_filter *, int num_filters);
|
uint64_t ndb_subscribe(struct ndb *, struct ndb_filter *, int num_filters);
|
||||||
int ndb_wait_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity);
|
int ndb_wait_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity);
|
||||||
int ndb_poll_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity);
|
int ndb_poll_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity);
|
||||||
int ndb_unsubscribe(int subid);
|
int ndb_unsubscribe(struct ndb *, uint64_t subid);
|
||||||
|
int ndb_num_subscriptions(struct ndb *);
|
||||||
|
|
||||||
// FULLTEXT SEARCH
|
// FULLTEXT SEARCH
|
||||||
int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *, struct ndb_text_search_config *);
|
int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *, struct ndb_text_search_config *);
|
||||||
|
|||||||
Reference in New Issue
Block a user