nostrdb: ingest: support kind 6 reposts
This also enables processing raw json via ndb import Fixes: https://github.com/damus-io/nostrdb/issues/46 Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
committed by
Daniel D’Aquino
parent
0c483bb55a
commit
47b79fc02e
@@ -2035,6 +2035,37 @@ int ndb_process_profile_note(struct ndb_note *note,
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
|
||||
char *json, unsigned len, unsigned client)
|
||||
{
|
||||
struct ndb_ingester_msg msg;
|
||||
msg.type = NDB_INGEST_EVENT;
|
||||
|
||||
msg.event.json = json;
|
||||
msg.event.len = len;
|
||||
msg.event.client = client;
|
||||
|
||||
return threadpool_dispatch(&ingester->tp, &msg);
|
||||
}
|
||||
|
||||
|
||||
static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json,
|
||||
int len, unsigned client)
|
||||
{
|
||||
// Since we need to return as soon as possible, and we're not
|
||||
// making any assumptions about the lifetime of the string, we
|
||||
// definitely need to copy the json here. In the future once we
|
||||
// have our thread that manages a websocket connection, we can
|
||||
// avoid the copy and just use the buffer we get from that
|
||||
// thread.
|
||||
char *json_copy = strdupn(json, len);
|
||||
if (json_copy == NULL)
|
||||
return 0;
|
||||
|
||||
return ndb_ingester_queue_event(ingester, json_copy, len, client);
|
||||
}
|
||||
|
||||
|
||||
static int ndb_ingester_process_note(secp256k1_context *ctx,
|
||||
struct ndb_note *note,
|
||||
size_t note_size,
|
||||
@@ -2077,12 +2108,18 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
|
||||
out->type = NDB_WRITER_PROFILE;
|
||||
out->profile.note.note = note;
|
||||
out->profile.note.note_len = note_size;
|
||||
} else {
|
||||
out->type = NDB_WRITER_NOTE;
|
||||
out->note.note = note;
|
||||
out->note.note_len = note_size;
|
||||
return 1;
|
||||
} else if (note->kind == 6) {
|
||||
// process the repost if we have a repost event
|
||||
ndb_debug("processing kind 6 repost\n");
|
||||
ndb_ingest_event(ingester, ndb_note_content(note),
|
||||
ndb_note_content_length(note), 0);
|
||||
}
|
||||
|
||||
out->type = NDB_WRITER_NOTE;
|
||||
out->note.note = note;
|
||||
out->note.note_len = note_size;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -4102,19 +4139,6 @@ static int ndb_ingester_destroy(struct ndb_ingester *ingester)
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int ndb_ingester_queue_event(struct ndb_ingester *ingester,
|
||||
char *json, unsigned len, unsigned client)
|
||||
{
|
||||
struct ndb_ingester_msg msg;
|
||||
msg.type = NDB_INGEST_EVENT;
|
||||
|
||||
msg.event.json = json;
|
||||
msg.event.len = len;
|
||||
msg.event.client = client;
|
||||
|
||||
return threadpool_dispatch(&ingester->tp, &msg);
|
||||
}
|
||||
|
||||
static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t mapsize)
|
||||
{
|
||||
int rc;
|
||||
@@ -4383,17 +4407,7 @@ void ndb_destroy(struct ndb *ndb)
|
||||
// The client-sent variation of ndb_process_event
|
||||
int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
|
||||
{
|
||||
// Since we need to return as soon as possible, and we're not
|
||||
// making any assumptions about the lifetime of the string, we
|
||||
// definitely need to copy the json here. In the future once we
|
||||
// have our thread that manages a websocket connection, we can
|
||||
// avoid the copy and just use the buffer we get from that
|
||||
// thread.
|
||||
char *json_copy = strdupn(json, len);
|
||||
if (json_copy == NULL)
|
||||
return 0;
|
||||
|
||||
return ndb_ingester_queue_event(&ndb->ingester, json_copy, len, 1);
|
||||
return ndb_ingest_event(&ndb->ingester, json, len, 1);
|
||||
}
|
||||
|
||||
// Process anostr event from a relay,
|
||||
@@ -4415,17 +4429,7 @@ int ndb_process_client_event(struct ndb *ndb, const char *json, int len)
|
||||
//
|
||||
int ndb_process_event(struct ndb *ndb, const char *json, int json_len)
|
||||
{
|
||||
// Since we need to return as soon as possible, and we're not
|
||||
// making any assumptions about the lifetime of the string, we
|
||||
// definitely need to copy the json here. In the future once we
|
||||
// have our thread that manages a websocket connection, we can
|
||||
// avoid the copy and just use the buffer we get from that
|
||||
// thread.
|
||||
char *json_copy = strdupn(json, json_len);
|
||||
if (json_copy == NULL)
|
||||
return 0;
|
||||
|
||||
return ndb_ingester_queue_event(&ndb->ingester, json_copy, json_len, 0);
|
||||
return ndb_ingest_event(&ndb->ingester, json, json_len, 0);
|
||||
}
|
||||
|
||||
|
||||
@@ -5360,12 +5364,19 @@ int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce,
|
||||
jsmntok_t *tok = NULL;
|
||||
int tok_len, res;
|
||||
struct ndb_json_parser parser;
|
||||
struct ndb_event *ev = &fce->event;
|
||||
|
||||
ndb_json_parser_init(&parser, json, len, buf, bufsize);
|
||||
|
||||
if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
|
||||
return res;
|
||||
|
||||
if (parser.toks[0].type == JSMN_OBJECT) {
|
||||
ndb_debug("got raw json in client_event_from_json\n");
|
||||
fce->evtype = NDB_FCE_EVENT;
|
||||
return ndb_parse_json_note(&parser, &ev->note);
|
||||
}
|
||||
|
||||
if (parser.num_tokens <= 3 || parser.toks[0].type != JSMN_ARRAY)
|
||||
return 0;
|
||||
|
||||
@@ -5377,7 +5388,6 @@ int ndb_client_event_from_json(const char *json, int len, struct ndb_fce *fce,
|
||||
|
||||
if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
|
||||
fce->evtype = NDB_FCE_EVENT;
|
||||
struct ndb_event *ev = &fce->event;
|
||||
return ndb_parse_json_note(&parser, &ev->note);
|
||||
}
|
||||
|
||||
@@ -5392,6 +5402,7 @@ int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
|
||||
jsmntok_t *tok = NULL;
|
||||
int tok_len, res;
|
||||
struct ndb_json_parser parser;
|
||||
struct ndb_event *ev = &tce->event;
|
||||
|
||||
tce->subid_len = 0;
|
||||
tce->subid = "";
|
||||
@@ -5401,6 +5412,12 @@ int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
|
||||
if ((res = ndb_json_parser_parse(&parser, cb)) < 0)
|
||||
return res;
|
||||
|
||||
if (parser.toks[0].type == JSMN_OBJECT) {
|
||||
ndb_debug("got raw json in ws_event_from_json\n");
|
||||
tce->evtype = NDB_TCE_EVENT;
|
||||
return ndb_parse_json_note(&parser, &ev->note);
|
||||
}
|
||||
|
||||
if (parser.num_tokens < 3 || parser.toks[0].type != JSMN_ARRAY)
|
||||
return 0;
|
||||
|
||||
@@ -5412,7 +5429,6 @@ int ndb_ws_event_from_json(const char *json, int len, struct ndb_tce *tce,
|
||||
|
||||
if (tok_len == 5 && !memcmp("EVENT", json + tok->start, 5)) {
|
||||
tce->evtype = NDB_TCE_EVENT;
|
||||
struct ndb_event *ev = &tce->event;
|
||||
|
||||
tok = &parser.toks[parser.i++];
|
||||
if (tok->type != JSMN_STRING)
|
||||
|
||||
Reference in New Issue
Block a user