Skip to content

Commit

Permalink
Track EnvWraps to ensure that they can be closed on exit
Browse files Browse the repository at this point in the history
  • Loading branch information
kriszyp committed Apr 30, 2022
1 parent c1497a7 commit 0c6612d
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 20 deletions.
48 changes: 28 additions & 20 deletions src/env.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "lmdb-js.h"

using namespace Napi;

#define IGNORE_NOTFOUND (1)
Expand All @@ -11,7 +12,7 @@ env_tracking_t* EnvWrap::initTracking() {
pthread_mutex_init(tracking->envsLock, nullptr);
return tracking;
}

thread_local std::vector<EnvWrap*> EnvWrap::openEnvWraps;
EnvWrap::EnvWrap(const CallbackInfo& info) : ObjectWrap<EnvWrap>(info) {
int rc;
rc = mdb_env_create(&(this->env));
Expand Down Expand Up @@ -137,7 +138,6 @@ static int encfunc(const MDB_val* src, MDB_val* dst, const MDB_val* key, int enc
#endif

void cleanup(void* data) {
// this may not be called on the main thread
((EnvWrap*) data)->closeEnv();
}

Expand Down Expand Up @@ -222,6 +222,8 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer,
mdb_env_close(env);
this->env = envPath->env;
pthread_mutex_unlock(envTracking->envsLock);
if ((jsFlags & DELETE_ON_CLOSE) || (flags & MDB_OVERLAPPINGSYNC))
openEnvWraps.push_back(this);
return 0;
}
++envPath;
Expand Down Expand Up @@ -274,6 +276,8 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer,
envPath.count = 1;
envPath.deleteOnClose = jsFlags & DELETE_ON_CLOSE;
envTracking->envs.push_back(envPath);
if ((jsFlags & DELETE_ON_CLOSE) || (flags & MDB_OVERLAPPINGSYNC))
openEnvWraps.push_back(this);
pthread_mutex_unlock(envTracking->envsLock);
return 0;

Expand Down Expand Up @@ -313,29 +317,17 @@ NAPI_FUNCTION(setJSFlags) {


void SharedEnv::finish(bool close) {
unsigned int envFlags; // This is primarily useful for detecting termination of threads and sync'ing on their termination
mdb_env_get_flags(env, &envFlags);
if (envFlags & MDB_OVERLAPPINGSYNC) {
mdb_env_sync(env, 1);
}
if (close) {
mdb_env_close(env);
}
if (deleteOnClose) {// I think we can try do delete and just expect it to fail if still open by other threads
unlink(path);
//unlink(strcat(envPath->path, "-lock"));
}
}
NAPI_FUNCTION(EnvWrap::onExit) {
// this is called when a process *or* a thread is forcibly exited. This happens
// before cleanup, but on the main thread cleanup may never be called. On child
// onExit won't be called if there wasn't an error
// close all the environments
pthread_mutex_lock(envTracking->envsLock);
for (auto envWrap : openEnvWraps) {
envWrap->closeEnv();
}
/*pthread_mutex_lock(envTracking->envsLock);
for (auto envPath : envTracking->envs) {
envPath.finish(false);
}
pthread_mutex_unlock(envTracking->envsLock);
pthread_mutex_unlock(envTracking->envsLock);*/
napi_value returnValue;
RETURN_UNDEFINED;
}
Expand All @@ -358,6 +350,13 @@ NAPI_FUNCTION(setEnvsPointer) {
void EnvWrap::closeEnv() {
if (!env)
return;
for (auto ewRef = openEnvWraps.begin(); ewRef != openEnvWraps.end(); ) {
if (*ewRef == this) {
openEnvWraps.erase(ewRef);
break;
}
++ewRef;
}
napi_remove_env_cleanup_hook(napiEnv, cleanup, this);
cleanupStrayTxns();
pthread_mutex_lock(envTracking->envsLock);
Expand All @@ -366,7 +365,16 @@ void EnvWrap::closeEnv() {
envPath->count--;
if (envPath->count <= 0) {
// last thread using it, we can really close it now
envPath->finish(true);
unsigned int envFlags; // This is primarily useful for detecting termination of threads and sync'ing on their termination
mdb_env_get_flags(env, &envFlags);
if (envFlags & MDB_OVERLAPPINGSYNC) {
mdb_env_sync(env, 1);
}
mdb_env_close(env);
if (envPath->deleteOnClose) {
unlink(envPath->path);
//unlink(strcat(envPath->path, "-lock"));
}
envTracking->envs.erase(envPath);
}
break;
Expand Down
1 change: 1 addition & 0 deletions src/lmdb-js.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ class EnvWrap : public ObjectWrap<EnvWrap> {
napi_env napiEnv;
// compression settings and space
Compression *compression;
static thread_local std::vector<EnvWrap*> openEnvWraps;

// Cleans up stray transactions
void cleanupStrayTxns();
Expand Down
16 changes: 16 additions & 0 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,22 @@ describe('lmdb-js', function() {
});
});
});
describe.only('Read-only Threads', function() {
this.timeout(1000000);
it('will run a group of threads with read-only transactions', function(done) {
var child = spawn('node', [fileURLToPath(new URL('./readonly-threads.cjs', import.meta.url))]);
child.stdout.on('data', function(data) {
console.log(data.toString());
});
child.stderr.on('data', function(data) {
console.error(data.toString());
});
child.on('close', function(code) {
code.should.equal(0);
done();
});
});
});
});

function delay(ms) {
Expand Down
91 changes: 91 additions & 0 deletions test/readonly-threads.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
var assert = require('assert');
const { Worker, isMainThread, parentPort, threadId } = require('worker_threads');
var path = require('path');
var numCPUs = require('os').cpus().length;

const { open } = require('../dist/index.cjs');
const MAX_DB_SIZE = 256 * 1024 * 1024;
if (isMainThread) {
var inspector = require('inspector')
// inspector.open(9331, null, true);debugger

// The main thread

let db = open({
path: path.resolve(__dirname, './testdata'),
maxDbs: 10,
mapSize: MAX_DB_SIZE,
maxReaders: 126,
overlappingSync: true,
});

var workerCount = Math.min(numCPUs * 2, 20);
var value = {test: '48656c6c6f2c20776f726c6421'};

// This will start as many workers as there are CPUs available.
var workers = [];
for (var i = 0; i < workerCount; i++) {
var worker = new Worker(__filename);
workers.push(worker);
}

var messages = [];
workers.forEach(function(worker) {
worker.on('message', function(msg) {
messages.push(msg);
// Once every worker has replied with a response for the value
// we can exit the test.

setTimeout(() => {
worker.terminate()
}, 8000);
if (messages.length === workerCount) {
db.close();
for (var i = 0; i < messages.length; i ++) {
assert(messages[i] === value.toString('hex'));
}
console.log("done", threadId)
//setTimeout(() =>
//process.exit(0), 200);
}
});
});

for (var i = 0; i < workers.length; i++) {
var worker = workers[i];
worker.postMessage({key: 'key' + i});
};


} else {
// The worker process
let db = open({
path: path.resolve(__dirname, './testdata'),
maxDbs: 10,
mapSize: MAX_DB_SIZE,
maxReaders: 126,
overlappingSync: true,
});


parentPort.on('message', async function(msg) {
if (msg.key) {
var value = db.get(msg.key);
let lastIterate = db.getRange().iterate()
setInterval(() => {
db.get(msg.key);
let iterate = db.getRange().iterate();
while(!lastIterate.next().done){}
lastIterate = iterate;
}, 1);
setTimeout(() => {
if (value === null) {
parentPort.postMessage("");
} else {
parentPort.postMessage(value.toString('hex'));
}
}, 10000);

}
});
}

0 comments on commit 0c6612d

Please sign in to comment.