diff --git a/src/writer.cpp b/src/writer.cpp index a4430cf58..207bc2c69 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -83,19 +83,26 @@ MDB_txn* WriteWorker::AcquireTxn(int* flags) { // TODO: if the conditionDepth is 0, we could allow the current worker's txn to be continued, committed and restarted pthread_mutex_lock(envForTxn->writingLock); - if (commitSynchronously && interruptionStatus == ALLOW_COMMIT) { + retry: + if (commitSynchronously && interruptionStatus == WORKER_WAITING) { //fprintf(stderr, "acquire interupting lock %p %u\n", this, commitSynchronously); interruptionStatus = INTERRUPT_BATCH; pthread_cond_signal(envForTxn->writingCond); pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock); - *flags |= TXN_FROM_WORKER; - return nullptr; + if (interruptionStatus == RESTART_WORKER_TXN) { + *flags |= TXN_FROM_WORKER; + return nullptr; + } else { + interruptionStatus = WORKER_WAITING; + goto retry; + } } else { //if (interruptionStatus == RESTART_WORKER_TXN) // pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock); interruptionStatus = USER_HAS_LOCK; *flags |= TXN_FROM_WORKER; - //fprintf(stderr, "acquire lock %p %u\n", txn, commitSynchronously); + //if (txn) + //fprintf(stderr, "acquire lock from worker %p %u\n", txn, commitSynchronously); return txn; } } @@ -116,21 +123,24 @@ void AsyncWriteWorker::ReportError(const char* error) { } int WriteWorker::WaitForCallbacks(MDB_txn** txn, bool allowCommit, uint32_t* target) { int rc; - //fprintf(stderr, "wait for callback %p\n", this); + //fprintf(stderr, "wait for callback %p\n", target); if (!finishedProgress) SendUpdate(); pthread_cond_signal(envForTxn->writingCond); - interruptionStatus = allowCommit ? ALLOW_COMMIT : 0; + interruptionStatus = WORKER_WAITING; if (target) { uint64_t delay = 1; do { cond_timedwait(envForTxn->writingCond, envForTxn->writingLock, delay); delay = delay << 1ll; - //if (delay > 5000) - // fprintf(stderr, "waited, %llu %p\n", delay, *target); - } while(!( - (*target & 0xf) || - (allowCommit && (interruptionStatus == INTERRUPT_BATCH || finishedProgress)))); + //if (delay > 500) + //fprintf(stderr, "waited, %llu %p\n", delay, *target); + if ((*target & 0xf) || allowCommit && finishedProgress) { + // we are in position to continue writing or commit, so forward progress can be made without interrupting yet + interruptionStatus = 0; + return 0; + } + } while(interruptionStatus != INTERRUPT_BATCH); } else pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock); if (interruptionStatus == INTERRUPT_BATCH) { // interrupted by JS code that wants to run a synchronous transaction @@ -188,9 +198,11 @@ next_inst: start = instruction++; status = std::atomic_exchange((std::atomic*)(instruction + 2), (int64_t)1); if (status == 2) { //fprintf(stderr, "wait on compression %p\n", instruction); + worker->interruptionStatus = WORKER_WAITING; do { pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock); } while (std::atomic_load((std::atomic*)(instruction + 2))); + worker->interruptionStatus = 0; } else if (status > 2) { //fprintf(stderr, "doing the compression ourselves\n"); ((Compression*) (size_t) *((double*)&status))->compressInstruction(nullptr, (double*) (instruction + 2)); @@ -238,8 +250,9 @@ next_inst: start = instruction++; if (!worker->finishedProgress || conditionDepth) { if (std::atomic_compare_exchange_strong((std::atomic*) start, (uint32_t*) &flags, - (uint32_t)WAITING_OPERATION)) + (uint32_t)WAITING_OPERATION)) { worker->WaitForCallbacks(&txn, conditionDepth == 0, start); + } goto next_inst; } else { if (std::atomic_compare_exchange_strong((std::atomic*) start, diff --git a/test/index.test.js b/test/index.test.js index d97204705..621bd3dfb 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -17,7 +17,6 @@ import { createBufferForAddress } from '../native.js' import { RangeIterable } from '../util/RangeIterable.js' import { assert } from 'console'; import { openAsClass } from '../open.js'; - describe('lmdb-js', function() { let testDirPath = path.resolve(dirName, './testdata-ls'); @@ -95,13 +94,14 @@ describe('lmdb-js', function() { }) return } - it('will not open non-existent db with create disabled', function() { - let noDb = db.open({ - name: 'not-there', - create: false, + it('will not open non-existent db with create disabled', function() { + let noDb = db.open({ + name: 'not-there', + create: false, + }); + should.equal(noDb, undefined); }); - should.equal(noDb, undefined); - }); + it('') it('zero length values', async function() { await db.committed // should be able to await db even if nothing has happened db.put(5, asBinary(Buffer.from([]))); @@ -923,17 +923,19 @@ describe('lmdb-js', function() { should.equal(db.get('test:c'), undefined) }); it('read and write with binary encoding', async function() { - should.equal(db.getString('not-there'), undefined); + should.equal(db.getString('not-there'), undefined); let dbBinary = db.openDB(Object.assign({ name: 'mydb5', encoding: 'binary' })); - should.equal(dbBinary.getString('not-there'), undefined); + should.equal(dbBinary.getString('not-there'), undefined); dbBinary.put('buffer', Buffer.from('hello')); dbBinary.put('empty', Buffer.from([])); let big = new Uint8Array(0x21000); big.fill(3); dbBinary.put('big', big); + dbBinary.put('big1', big); + dbBinary.put('big2', big); let promise = dbBinary.put('Uint8Array', new Uint8Array([1,2,3])); await promise await promise.flushed @@ -945,7 +947,16 @@ describe('lmdb-js', function() { dbBinary.get('big')[3].should.equal(3); dbBinary.get('big')[3].should.equal(3); - dbBinary.getBinaryFast('big')[3].should.equal(3); + for (let i = 0; i < 100; i++) { + dbBinary.getBinaryFast('big')[3].should.equal(3); + dbBinary.getBinaryFast('big1')[3].should.equal(3); + dbBinary.getBinaryFast('big2')[3].should.equal(3); + let a + for (let j = 0; j < 100000;j++) { + a = {} + } + await delay(1) + } dbBinary.getBinaryFast('big')[3].should.equal(3); // do it twice to test detach the previous one dbBinary.get('Uint8Array')[1].should.equal(2); Array.from(dbBinary.getRange({ start: 'big' }))[0].value[3].should.equal(3); @@ -988,24 +999,28 @@ describe('lmdb-js', function() { for (let i = 0; i < 100; i++) { data += Math.random() } - for (let i = 0; i < 10; i++) { + for (let i = 0; i < 100; i++) { let db = open(testDirPath + '/təst-close.mdb', { // name: 'test-close', compression: true, overlappingSync: true, + batchStartThreshold: 5, }); - let db2 = db.openDB({ - name: 'child' - }) - if (i > 0) { - let v = db.get('key') - v = db.get('key1') - v = db.get('key2') - v = db.get('key3') - db.put('key', data); - if (i == 4) - await db.put('key', data); - } + for (let j = 0; j < 100; j++) + db.put('key', data); + let db2 = db.openDB({ + name: 'child' + }) + db2.get('test') + if (i > 0) { + let v = db.get('key') + v = db.get('key1') + v = db.get('key2') + v = db.get('key3') + db.put('key', data); + if (i == 4) + await db.put('key', data); + } let promise = db.close(); expect(() => db.put('key1', data)).to.throw(); await promise;