nostrdb: windows: fix threading bugs
Signed-off-by: William Casarin <jb55@jb55.com>
This commit is contained in:
committed by
Daniel D’Aquino
parent
3186b0e1d3
commit
02df1e209b
@@ -3980,6 +3980,7 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
|
|||||||
|
|
||||||
static void *ndb_writer_thread(void *data)
|
static void *ndb_writer_thread(void *data)
|
||||||
{
|
{
|
||||||
|
ndb_debug("started writer thread\n");
|
||||||
struct ndb_writer *writer = data;
|
struct ndb_writer *writer = data;
|
||||||
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
|
struct ndb_writer_msg msgs[THREAD_QUEUE_BATCH], *msg;
|
||||||
struct written_note written_notes[THREAD_QUEUE_BATCH];
|
struct written_note written_notes[THREAD_QUEUE_BATCH];
|
||||||
@@ -3999,6 +4000,7 @@ static void *ndb_writer_thread(void *data)
|
|||||||
while (!done) {
|
while (!done) {
|
||||||
txn.mdb_txn = NULL;
|
txn.mdb_txn = NULL;
|
||||||
num_notes = 0;
|
num_notes = 0;
|
||||||
|
ndb_debug("writer waiting for items\n");
|
||||||
popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
|
popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
|
||||||
ndb_debug("writer popped %d items\n", popped);
|
ndb_debug("writer popped %d items\n", popped);
|
||||||
|
|
||||||
@@ -4029,6 +4031,7 @@ static void *ndb_writer_thread(void *data)
|
|||||||
switch (msg->type) {
|
switch (msg->type) {
|
||||||
case NDB_WRITER_QUIT:
|
case NDB_WRITER_QUIT:
|
||||||
// quits are handled before this
|
// quits are handled before this
|
||||||
|
ndb_debug("writer thread got quit message\n");
|
||||||
done = 1;
|
done = 1;
|
||||||
continue;
|
continue;
|
||||||
case NDB_WRITER_PROFILE:
|
case NDB_WRITER_PROFILE:
|
||||||
@@ -4242,14 +4245,18 @@ static int ndb_writer_destroy(struct ndb_writer *writer)
|
|||||||
|
|
||||||
// kill thread
|
// kill thread
|
||||||
msg.type = NDB_WRITER_QUIT;
|
msg.type = NDB_WRITER_QUIT;
|
||||||
|
ndb_debug("writer: pushing quit message\n");
|
||||||
if (!prot_queue_push(&writer->inbox, &msg)) {
|
if (!prot_queue_push(&writer->inbox, &msg)) {
|
||||||
// queue is too full to push quit message. just kill it.
|
// queue is too full to push quit message. just kill it.
|
||||||
|
ndb_debug("writer: terminating thread\n");
|
||||||
THREAD_TERMINATE(writer->thread_id);
|
THREAD_TERMINATE(writer->thread_id);
|
||||||
} else {
|
} else {
|
||||||
|
ndb_debug("writer: joining thread\n");
|
||||||
THREAD_FINISH(writer->thread_id);
|
THREAD_FINISH(writer->thread_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanup
|
// cleanup
|
||||||
|
ndb_debug("writer: cleaning up protected queue\n");
|
||||||
prot_queue_destroy(&writer->inbox);
|
prot_queue_destroy(&writer->inbox);
|
||||||
|
|
||||||
free(writer->queue_buf);
|
free(writer->queue_buf);
|
||||||
@@ -4515,12 +4522,17 @@ void ndb_destroy(struct ndb *ndb)
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
// ingester depends on writer and must be destroyed first
|
// ingester depends on writer and must be destroyed first
|
||||||
|
ndb_debug("destroying ingester\n");
|
||||||
ndb_ingester_destroy(&ndb->ingester);
|
ndb_ingester_destroy(&ndb->ingester);
|
||||||
|
ndb_debug("destroying writer\n");
|
||||||
ndb_writer_destroy(&ndb->writer);
|
ndb_writer_destroy(&ndb->writer);
|
||||||
|
ndb_debug("destroying monitor\n");
|
||||||
ndb_monitor_destroy(&ndb->monitor);
|
ndb_monitor_destroy(&ndb->monitor);
|
||||||
|
|
||||||
|
ndb_debug("closing env\n");
|
||||||
mdb_env_close(ndb->lmdb.env);
|
mdb_env_close(ndb->lmdb.env);
|
||||||
|
|
||||||
|
ndb_debug("ndb destroyed\n");
|
||||||
free(ndb);
|
free(ndb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,27 +5,47 @@
|
|||||||
#include <windows.h>
|
#include <windows.h>
|
||||||
|
|
||||||
#define ErrCode() GetLastError()
|
#define ErrCode() GetLastError()
|
||||||
#define pthread_t HANDLE
|
// Define POSIX-like thread types
|
||||||
#define pthread_mutex_t HANDLE
|
typedef HANDLE pthread_t;
|
||||||
#define pthread_cond_t HANDLE
|
typedef CRITICAL_SECTION pthread_mutex_t;
|
||||||
#define pthread_cond_destroy(x)
|
typedef CONDITION_VARIABLE pthread_cond_t;
|
||||||
#define pthread_mutex_unlock(x) ReleaseMutex(*x)
|
|
||||||
#define pthread_mutex_destroy(x) \
|
#define ErrCode() GetLastError()
|
||||||
(CloseHandle(*x) ? 0 : ErrCode())
|
|
||||||
#define pthread_mutex_lock(x) WaitForSingleObject(*x, INFINITE)
|
// Mutex functions
|
||||||
#define pthread_mutex_init(mutex, attr) \
|
#define pthread_mutex_init(mutex, attr) \
|
||||||
((*mutex = CreateMutex(NULL, FALSE, NULL)) ? 0 : ErrCode())
|
(InitializeCriticalSection(mutex), 0)
|
||||||
#define pthread_cond_init(x, attr) (InitializeConditionVariable(x), 0)
|
|
||||||
#define pthread_cond_signal(x) SetEvent(*x)
|
#define pthread_mutex_destroy(mutex) \
|
||||||
#define pthread_cond_wait(cond,mutex) do{SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE);}while(0)
|
(DeleteCriticalSection(mutex), 0)
|
||||||
#define THREAD_CREATE(thr,start,arg) \
|
|
||||||
(((thr) = CreateThread(NULL, 0, start, arg, 0, NULL)) ? 0 : ErrCode())
|
#define pthread_mutex_lock(mutex) \
|
||||||
#define THREAD_FINISH(thr) \
|
(EnterCriticalSection(mutex), 0)
|
||||||
(WaitForSingleObject(thr, INFINITE) ? ErrCode() : 0)
|
|
||||||
#define THREAD_TERMINATE(thr) \
|
#define pthread_mutex_unlock(mutex) \
|
||||||
(TerminateThread(thr, 0) ? ErrCode() : 0)
|
(LeaveCriticalSection(mutex), 0)
|
||||||
#define LOCK_MUTEX(mutex) WaitForSingleObject(mutex, INFINITE)
|
|
||||||
#define UNLOCK_MUTEX(mutex) ReleaseMutex(mutex)
|
// Condition variable functions
|
||||||
|
#define pthread_cond_init(cond, attr) \
|
||||||
|
(InitializeConditionVariable(cond), 0)
|
||||||
|
|
||||||
|
#define pthread_cond_destroy(cond)
|
||||||
|
|
||||||
|
#define pthread_cond_signal(cond) \
|
||||||
|
(WakeConditionVariable(cond), 0)
|
||||||
|
|
||||||
|
#define pthread_cond_wait(cond, mutex) \
|
||||||
|
(SleepConditionVariableCS(cond, mutex, INFINITE) ? 0 : ErrCode())
|
||||||
|
|
||||||
|
// Thread functions
|
||||||
|
#define THREAD_CREATE(thr, start, arg) \
|
||||||
|
(((thr = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)start, arg, 0, NULL)) != NULL) ? 0 : ErrCode())
|
||||||
|
|
||||||
|
#define THREAD_FINISH(thr) \
|
||||||
|
(WaitForSingleObject(thr, INFINITE), CloseHandle(thr), 0)
|
||||||
|
|
||||||
|
#define THREAD_TERMINATE(thr) \
|
||||||
|
(TerminateThread(thr, 0) ? ErrCode() : 0)
|
||||||
|
|
||||||
#else // _WIN32
|
#else // _WIN32
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
|||||||
Reference in New Issue
Block a user