Skip to content

Commit

Permalink
Prioritize guarantee of transaction synchronously executing over asyn…
Browse files Browse the repository at this point in the history
…c transaction grouping, #164
  • Loading branch information
kriszyp committed May 30, 2022
1 parent 8152720 commit ccf8407
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 35 deletions.
37 changes: 25 additions & 12 deletions src/writer.cpp
Expand Up @@ -83,19 +83,26 @@ MDB_txn* WriteWorker::AcquireTxn(int* flags) {

// TODO: if the conditionDepth is 0, we could allow the current worker's txn to be continued, committed and restarted
pthread_mutex_lock(envForTxn->writingLock);
if (commitSynchronously && interruptionStatus == ALLOW_COMMIT) {
retry:
if (commitSynchronously && interruptionStatus == WORKER_WAITING) {
//fprintf(stderr, "acquire interupting lock %p %u\n", this, commitSynchronously);
interruptionStatus = INTERRUPT_BATCH;
pthread_cond_signal(envForTxn->writingCond);
pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock);
*flags |= TXN_FROM_WORKER;
return nullptr;
if (interruptionStatus == RESTART_WORKER_TXN) {
*flags |= TXN_FROM_WORKER;
return nullptr;
} else {
interruptionStatus = WORKER_WAITING;
goto retry;
}
} else {
//if (interruptionStatus == RESTART_WORKER_TXN)
// pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock);
interruptionStatus = USER_HAS_LOCK;
*flags |= TXN_FROM_WORKER;
//fprintf(stderr, "acquire lock %p %u\n", txn, commitSynchronously);
//if (txn)
//fprintf(stderr, "acquire lock from worker %p %u\n", txn, commitSynchronously);
return txn;
}
}
Expand All @@ -116,21 +123,24 @@ void AsyncWriteWorker::ReportError(const char* error) {
}
int WriteWorker::WaitForCallbacks(MDB_txn** txn, bool allowCommit, uint32_t* target) {
int rc;
//fprintf(stderr, "wait for callback %p\n", this);
//fprintf(stderr, "wait for callback %p\n", target);
if (!finishedProgress)
SendUpdate();
pthread_cond_signal(envForTxn->writingCond);
interruptionStatus = allowCommit ? ALLOW_COMMIT : 0;
interruptionStatus = WORKER_WAITING;
if (target) {
uint64_t delay = 1;
do {
cond_timedwait(envForTxn->writingCond, envForTxn->writingLock, delay);
delay = delay << 1ll;
//if (delay > 5000)
// fprintf(stderr, "waited, %llu %p\n", delay, *target);
} while(!(
(*target & 0xf) ||
(allowCommit && (interruptionStatus == INTERRUPT_BATCH || finishedProgress))));
//if (delay > 500)
//fprintf(stderr, "waited, %llu %p\n", delay, *target);
if ((*target & 0xf) || allowCommit && finishedProgress) {
// we are in position to continue writing or commit, so forward progress can be made without interrupting yet
interruptionStatus = 0;
return 0;
}
} while(interruptionStatus != INTERRUPT_BATCH);
} else
pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock);
if (interruptionStatus == INTERRUPT_BATCH) { // interrupted by JS code that wants to run a synchronous transaction
Expand Down Expand Up @@ -188,9 +198,11 @@ next_inst: start = instruction++;
status = std::atomic_exchange((std::atomic<int64_t>*)(instruction + 2), (int64_t)1);
if (status == 2) {
//fprintf(stderr, "wait on compression %p\n", instruction);
worker->interruptionStatus = WORKER_WAITING;
do {
pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock);
} while (std::atomic_load((std::atomic<int64_t>*)(instruction + 2)));
worker->interruptionStatus = 0;
} else if (status > 2) {
//fprintf(stderr, "doing the compression ourselves\n");
((Compression*) (size_t) *((double*)&status))->compressInstruction(nullptr, (double*) (instruction + 2));
Expand Down Expand Up @@ -238,8 +250,9 @@ next_inst: start = instruction++;
if (!worker->finishedProgress || conditionDepth) {
if (std::atomic_compare_exchange_strong((std::atomic<uint32_t>*) start,
(uint32_t*) &flags,
(uint32_t)WAITING_OPERATION))
(uint32_t)WAITING_OPERATION)) {
worker->WaitForCallbacks(&txn, conditionDepth == 0, start);
}
goto next_inst;
} else {
if (std::atomic_compare_exchange_strong((std::atomic<uint32_t>*) start,
Expand Down
61 changes: 38 additions & 23 deletions test/index.test.js
Expand Up @@ -17,7 +17,6 @@ import { createBufferForAddress } from '../native.js'
import { RangeIterable } from '../util/RangeIterable.js'
import { assert } from 'console';
import { openAsClass } from '../open.js';

describe('lmdb-js', function() {
let testDirPath = path.resolve(dirName, './testdata-ls');

Expand Down Expand Up @@ -95,13 +94,14 @@ describe('lmdb-js', function() {
})
return
}
it('will not open non-existent db with create disabled', function() {
let noDb = db.open({
name: 'not-there',
create: false,
it('will not open non-existent db with create disabled', function() {
let noDb = db.open({
name: 'not-there',
create: false,
});
should.equal(noDb, undefined);
});
should.equal(noDb, undefined);
});
it('')
it('zero length values', async function() {
await db.committed // should be able to await db even if nothing has happened
db.put(5, asBinary(Buffer.from([])));
Expand Down Expand Up @@ -923,17 +923,19 @@ describe('lmdb-js', function() {
should.equal(db.get('test:c'), undefined)
});
it('read and write with binary encoding', async function() {
should.equal(db.getString('not-there'), undefined);
should.equal(db.getString('not-there'), undefined);
let dbBinary = db.openDB(Object.assign({
name: 'mydb5',
encoding: 'binary'
}));
should.equal(dbBinary.getString('not-there'), undefined);
should.equal(dbBinary.getString('not-there'), undefined);
dbBinary.put('buffer', Buffer.from('hello'));
dbBinary.put('empty', Buffer.from([]));
let big = new Uint8Array(0x21000);
big.fill(3);
dbBinary.put('big', big);
dbBinary.put('big1', big);
dbBinary.put('big2', big);
let promise = dbBinary.put('Uint8Array', new Uint8Array([1,2,3]));
await promise
await promise.flushed
Expand All @@ -945,7 +947,16 @@ describe('lmdb-js', function() {

dbBinary.get('big')[3].should.equal(3);
dbBinary.get('big')[3].should.equal(3);
dbBinary.getBinaryFast('big')[3].should.equal(3);
for (let i = 0; i < 100; i++) {
dbBinary.getBinaryFast('big')[3].should.equal(3);
dbBinary.getBinaryFast('big1')[3].should.equal(3);
dbBinary.getBinaryFast('big2')[3].should.equal(3);
let a
for (let j = 0; j < 100000;j++) {
a = {}
}
await delay(1)
}
dbBinary.getBinaryFast('big')[3].should.equal(3); // do it twice to test detach the previous one
dbBinary.get('Uint8Array')[1].should.equal(2);
Array.from(dbBinary.getRange({ start: 'big' }))[0].value[3].should.equal(3);
Expand Down Expand Up @@ -988,24 +999,28 @@ describe('lmdb-js', function() {
for (let i = 0; i < 100; i++) {
data += Math.random()
}
for (let i = 0; i < 10; i++) {
for (let i = 0; i < 100; i++) {
let db = open(testDirPath + '/təst-close.mdb', {
// name: 'test-close',
compression: true,
overlappingSync: true,
batchStartThreshold: 5,
});
let db2 = db.openDB({
name: 'child'
})
if (i > 0) {
let v = db.get('key')
v = db.get('key1')
v = db.get('key2')
v = db.get('key3')
db.put('key', data);
if (i == 4)
await db.put('key', data);
}
for (let j = 0; j < 100; j++)
db.put('key', data);
let db2 = db.openDB({
name: 'child'
})
db2.get('test')
if (i > 0) {
let v = db.get('key')
v = db.get('key1')
v = db.get('key2')
v = db.get('key3')
db.put('key', data);
if (i == 4)
await db.put('key', data);
}
let promise = db.close();
expect(() => db.put('key1', data)).to.throw();
await promise;
Expand Down

0 comments on commit ccf8407

Please sign in to comment.