Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deduplicate move operations #203

Merged
merged 1 commit into from May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 14 additions & 3 deletions lib/content/write.js
Expand Up @@ -15,6 +15,9 @@ const fsm = require('fs-minipass')

module.exports = write

// Cache of move operations in process so we don't duplicate
const moveOperations = new Map()

async function write (cache, data, opts = {}) {
const { algorithms, size, integrity } = opts

Expand Down Expand Up @@ -159,15 +162,23 @@ async function makeTmp (cache, opts) {
async function moveToDestination (tmp, cache, sri, opts) {
const destination = contentPath(cache, sri)
const destDir = path.dirname(destination)

await fs.mkdir(destDir, { recursive: true })
if (moveOperations.has(destination)) {
return moveOperations.get(destination)
}
fritzy marked this conversation as resolved.
Show resolved Hide resolved
moveOperations.set(
destination,
fs.mkdir(destDir, { recursive: true })
.then(() => moveFile(tmp.target, destination, { overwrite: false }))
)
try {
await moveFile(tmp.target, destination, { overwrite: false })
await moveOperations.get(destination)
tmp.moved = true
} catch (err) {
if (!err.message.startsWith('The destination file exists')) {
throw Object.assign(err, { code: 'EEXIST' })
}
} finally {
moveOperations.delete(destination)
}
}

Expand Down
16 changes: 16 additions & 0 deletions test/put.js
@@ -1,6 +1,7 @@
'use strict'

const fs = require('fs/promises')
const path = require('path')
const index = require('../lib/entry-index')
const memo = require('../lib/memoization')
const t = require('tap')
Expand Down Expand Up @@ -128,3 +129,18 @@ t.test('signals error if error writing to cache', async t => {
t.equal(bulkErr.code, 'EBADSIZE', 'got error from bulk write')
t.equal(streamErr.code, 'EBADSIZE', 'got error from stream write')
})

t.test('concurrent puts', async t => {
const CACHE = t.testdir()
await Promise.all([
put(CACHE, KEY, CONTENT),
put(CACHE, KEY, CONTENT),
put(CACHE, KEY, CONTENT),
put(CACHE, KEY, CONTENT),
put(CACHE, KEY, CONTENT),
put(CACHE, KEY, CONTENT),
put(CACHE, KEY, CONTENT),
])
const tmpFiles = await fs.readdir(path.join(CACHE, 'tmp'))
t.strictSame(tmpFiles, [], 'Nothing left in tmp')
})