nostrdb: resync with repo

Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
William Casarin
2025-02-13 14:35:22 -08:00
committed by Daniel D’Aquino
parent 7831ede057
commit b4b84e6895
7 changed files with 709 additions and 329 deletions

View File

@@ -257,7 +257,7 @@ struct ndb_search_words
// str: cstr
// timestamp: varint
// word_index: varint
//
//
static int ndb_make_text_search_key(unsigned char *buf, int bufsize,
int word_index, int word_len, const char *str,
uint64_t timestamp, uint64_t note_id,
@@ -453,7 +453,7 @@ static inline int ndb_unpack_text_search_key_string(struct cursor *cur,
if (!cursor_skip(cur, *str_len))
return 0;
return 1;
}
@@ -527,7 +527,7 @@ int ndb_filter_clone(struct ndb_filter *dst, struct ndb_filter *src)
data_size = src->data_buf.end - src->data_buf.start;
src_size = data_size + elem_size;
// let's only allow finalized filters to be cloned
// let's only allow finalized filters to be cloned
if (!src || !src->finalized)
return 0;
@@ -584,7 +584,7 @@ int ndb_filter_end(struct ndb_filter *filter)
filter->finalized = 1;
ndb_debug("ndb_filter_end: %ld -> %ld\n", orig_size, elem_len + data_len);
return 1;
}
@@ -649,12 +649,6 @@ ndb_filter_get_string_element(const struct ndb_filter *filter, const struct ndb_
return (const char *)ndb_filter_elements_data(filter, els->elements[index]);
}
uint64_t *
ndb_filter_get_int_element_ptr(struct ndb_filter_elements *els, int index)
{
return &els->elements[index];
}
uint64_t
ndb_filter_get_int_element(const struct ndb_filter_elements *els, int index)
{
@@ -1238,7 +1232,7 @@ static int ndb_write_profile_search_index(struct ndb_txn *txn,
{
int rc;
MDB_val key, val;
key.mv_data = index_key;
key.mv_size = sizeof(*index_key);
val.mv_data = &profile_key;
@@ -1499,7 +1493,7 @@ static inline void ndb_tsid_high(struct ndb_tsid *key, const unsigned char *id)
}
enum ndb_ingester_msgtype {
NDB_INGEST_EVENT, // write json to the ingester queue for processing
NDB_INGEST_EVENT, // write json to the ingester queue for processing
NDB_INGEST_QUIT, // kill ingester thread immediately
};
@@ -1645,7 +1639,7 @@ static struct ndb_migration MIGRATIONS[] = {
int ndb_end_query(struct ndb_txn *txn)
{
// this works on read or write queries.
// this works on read or write queries.
return mdb_txn_commit(txn->mdb_txn) == 0;
}
@@ -1691,7 +1685,7 @@ static void ndb_writer_last_profile_fetch(struct ndb_txn *txn,
{
int rc;
MDB_val key, val;
key.mv_data = (unsigned char*)pubkey;
key.mv_size = 32;
val.mv_data = &fetched_at;
@@ -1967,7 +1961,7 @@ static int ndbprofile_parse_json(flatcc_builder_t *B,
NdbProfile_parse_json_table(ctx, buf, buf + bufsiz, profile);
if (ctx->error)
return 0;
if (!flatcc_builder_end_buffer(B, *profile))
return 0;
@@ -2069,7 +2063,7 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
assert(((uint64_t)note % 4) == 0);
if (note->kind == 0) {
struct ndb_profile_record_builder *b =
struct ndb_profile_record_builder *b =
&out->profile.record;
ndb_process_profile_note(note, b);
@@ -2122,7 +2116,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
}
note_size =
ev->client ?
ev->client ?
ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) :
ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb);
@@ -2317,7 +2311,7 @@ static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b)
}
static int ndb_write_profile_pk_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t profile_key)
{
MDB_val key, val;
int rc;
@@ -2355,7 +2349,7 @@ static int ndb_write_profile(struct ndb_txn *txn,
MDB_val key, val;
MDB_dbi profile_db;
note = profile->note.note;
// add note_key to profile record
@@ -2481,7 +2475,7 @@ static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note)
} else {
// clone existing and add to it
meta = NdbEventMeta_as_root(root);
reactions = NdbEventMeta_reactions_get(meta);
NdbEventMeta_clone(&builder, meta);
NdbEventMeta_reactions_add(&builder, reactions + 1);
@@ -2500,7 +2494,7 @@ static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note)
// if we have the note yet or not
key.mv_data = liked;
key.mv_size = 32;
val.mv_data = root;
val.mv_size = len;
@@ -2522,7 +2516,7 @@ static int ndb_write_reaction_stats(struct ndb_txn *txn, struct ndb_note *note)
static int ndb_write_note_id_index(struct ndb_txn *txn, struct ndb_note *note,
uint64_t note_key)
{
struct ndb_tsid tsid;
int rc;
@@ -3341,7 +3335,7 @@ static int ndb_prefix_matches(struct ndb_text_search_result *result,
// matches are nice but range searches allow us to match prefixes as
// well. A double-char prefix is suffient, but maybe we could up this
// in the future.
//
//
// TODO: How are we handling utf-8 prefix matches like
// japanese?
//
@@ -3357,7 +3351,7 @@ static int ndb_prefix_matches(struct ndb_text_search_result *result,
search_word->word,
search_word->word_len);
if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5))
if (result->prefix_chars <= (int)((double)search_word->word_len / 1.5))
return 0;
return 1;
@@ -3530,7 +3524,7 @@ int ndb_text_search(struct ndb_txn *txn, const char *query,
limit = min(limit, config->limit);
}
// end search config
text_db = txn->lmdb->dbs[NDB_DB_NOTE_TEXT];
make_cursor((unsigned char *)query, (unsigned char *)query + strlen(query), &cur);
@@ -3706,7 +3700,7 @@ static uint64_t ndb_write_note(struct ndb_txn *txn,
// let's quickly sanity check if we already have this note
if (ndb_get_notekey_by_id(txn, note->note->id))
return 0;
// get dbs
note_db = txn->lmdb->dbs[NDB_DB_NOTE];
@@ -3753,7 +3747,7 @@ static void ndb_write_version(struct ndb_txn *txn, uint64_t version)
uint64_t version_key;
version_key = NDB_META_KEY_VERSION;
key.mv_data = &version_key;
key.mv_size = sizeof(version_key);
val.mv_data = &version;
@@ -3871,7 +3865,7 @@ static void *ndb_writer_thread(void *data)
done = 1;
continue;
case NDB_WRITER_PROFILE:
note_nkey =
note_nkey =
ndb_write_note(&txn, &msg->note,
scratch, scratch_size);
if (note_nkey > 0) {
@@ -4010,7 +4004,7 @@ static void *ndb_ingester_thread(void *data)
if (to_write > 0) {
ndb_debug("pushing %d events to write queue\n", to_write);
if (!ndb_writer_queue_msgs(ingester->writer, outs, to_write)) {
ndb_debug("failed pushing %d events to write queue\n", to_write);
ndb_debug("failed pushing %d events to write queue\n", to_write);
}
}
}
@@ -4043,7 +4037,7 @@ static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb,
fprintf(stderr, "ndb writer thread failed to create\n");
return 0;
}
return 1;
}
@@ -4247,7 +4241,7 @@ static int ndb_queue_write_version(struct ndb *ndb, uint64_t version)
static int ndb_run_migrations(struct ndb *ndb)
{
int64_t version, latest_version, i;
latest_version = sizeof(MIGRATIONS) / sizeof(MIGRATIONS[0]);
if ((version = ndb_db_version(ndb)) == -1) {
@@ -4399,7 +4393,7 @@ int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
// Process anostr event from a relay,
//
// ie: ["EVENT", "subid", {"content":"..."}...]
//
//
// This function returns as soon as possible, first copying the passed
// json and then queueing it up for processing. Worker threads then take
// the json and process it.
@@ -5071,7 +5065,7 @@ static inline int ndb_builder_find_str(struct ndb_builder *builder,
uint32_t index = ((uint32_t*)builder->str_indices.start)[i];
const char *some_str = (const char*)builder->strings.start + index;
if (!memcmp(some_str, str, len) && some_str[len] == '\0') {
if (!memcmp(some_str, str, len)) {
// found an existing matching str, use that index
*pstr = ndb_offset_str(index);
return 1;
@@ -5222,7 +5216,7 @@ static int ndb_builder_make_json_str(struct ndb_builder *builder,
int *written, int pack_ids)
{
// let's not care about de-duping these. we should just unescape
// in-place directly into the strings table.
// in-place directly into the strings table.
if (written)
*written = len;
@@ -5422,7 +5416,7 @@ int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
tce->subid_len = toksize(tok);
return ndb_parse_json_note(&parser, &ev->note);
} else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) {
} else if (tok_len == 4 && !memcmp("EOSE", json + tok->start, 4)) {
tce->evtype = NDB_TCE_EOSE;
tok = &parser.toks[parser.i++];
@@ -5984,7 +5978,7 @@ int ndb_stat(struct ndb *ndb, struct ndb_stat *stat)
}
/// Push an element to the current tag
///
///
/// Basic idea is to call ndb_builder_new_tag
inline int ndb_builder_push_tag_str(struct ndb_builder *builder,
const char *str, int len)
@@ -5998,7 +5992,7 @@ inline int ndb_builder_push_tag_str(struct ndb_builder *builder,
//
// CONFIG
//
//
void ndb_default_config(struct ndb_config *config)
{
int cores = get_cpu_cores();
@@ -6478,7 +6472,7 @@ uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filt
ndb_filter_group_init(&sub->group);
if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters))
return 0;
// 500k ought to be enough for anyone
buflen = sizeof(uint64_t) * 65536;
buf = malloc(buflen);