diff --git a/lib/content/write.js b/lib/content/write.js index b6f5c56..740a298 100644 --- a/lib/content/write.js +++ b/lib/content/write.js @@ -15,6 +15,9 @@ const fsm = require('fs-minipass') module.exports = write +// Cache of move operations in process so we don't duplicate +const moveOperations = {} + async function write (cache, data, opts = {}) { const { algorithms, size, integrity } = opts @@ -159,15 +162,20 @@ async function makeTmp (cache, opts) { async function moveToDestination (tmp, cache, sri, opts) { const destination = contentPath(cache, sri) const destDir = path.dirname(destination) - - await fs.mkdir(destDir, { recursive: true }) + if (moveOperations[destination]) { + return moveOperations[destination] + } + moveOperations[destination] = fs.mkdir(destDir, { recursive: true }) + .then(() => moveFile(tmp.target, destination, { overwrite: false })) try { - await moveFile(tmp.target, destination, { overwrite: false }) + await moveOperations[destination] tmp.moved = true } catch (err) { if (!err.message.startsWith('The destination file exists')) { throw Object.assign(err, { code: 'EEXIST' }) } + } finally { + delete moveOperations[destination] } } diff --git a/test/put.js b/test/put.js index 375e3dd..d31b8dc 100644 --- a/test/put.js +++ b/test/put.js @@ -128,3 +128,17 @@ t.test('signals error if error writing to cache', async t => { t.equal(bulkErr.code, 'EBADSIZE', 'got error from bulk write') t.equal(streamErr.code, 'EBADSIZE', 'got error from stream write') }) + +t.test('concurrent puts', async t => { + const CACHE = t.testdir() + await Promise.all([ + put(CACHE, KEY, CONTENT), + put(CACHE, KEY, CONTENT), + put(CACHE, KEY, CONTENT), + put(CACHE, KEY, CONTENT), + put(CACHE, KEY, CONTENT), + put(CACHE, KEY, CONTENT), + put(CACHE, KEY, CONTENT), + ]) + t.ok('No errors') +})