Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@uppy/aws-s3-multipart: Fix race condition in #uploadParts #3955

Merged
merged 2 commits into from Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 2 additions & 6 deletions packages/@uppy/aws-s3-multipart/src/MultipartUploader.js
Expand Up @@ -278,26 +278,25 @@ class MultipartUploader {
#uploadPartRetryable (index, prePreparedPart) {
return this.#retryable({
before: () => {
this.chunkState[index].busy = true
this.partsInProgress += 1
},
attempt: () => this.#uploadPart(index, prePreparedPart),
after: () => {
this.chunkState[index].busy = false
this.partsInProgress -= 1
},
})
}

#uploadPart (index, prePreparedPart) {
this.chunkState[index].busy = true

const valid = typeof prePreparedPart?.url === 'string'
if (!valid) {
throw new TypeError('AwsS3/Multipart: Got incorrect result for `prePreparedPart`, expected an object `{ url }`.')
}

const { url, headers } = prePreparedPart
if (this.#aborted()) {
this.chunkState[index].busy = false
throw createAbortError()
}

Expand Down Expand Up @@ -359,14 +358,12 @@ class MultipartUploader {

xhr.addEventListener('abort', () => {
cleanup()
this.chunkState[index].busy = false

defer.reject(createAbortError())
})

xhr.addEventListener('load', (ev) => {
cleanup()
this.chunkState[index].busy = false

if (ev.target.status < 200 || ev.target.status >= 300) {
const error = new Error('Non 2xx')
Expand Down Expand Up @@ -394,7 +391,6 @@ class MultipartUploader {

xhr.addEventListener('error', (ev) => {
cleanup()
this.chunkState[index].busy = false

const error = new Error('Unknown error')
error.source = ev.target
Expand Down
84 changes: 79 additions & 5 deletions packages/@uppy/aws-s3-multipart/src/index.test.js
Expand Up @@ -54,13 +54,12 @@ describe('AwsS3Multipart', () => {
}),
completeMultipartUpload: jest.fn(async () => ({ location: 'test' })),
abortMultipartUpload: jest.fn(),
prepareUploadParts: jest.fn(async () => {
prepareUploadParts: jest.fn(async (file, { parts }) => {
const presignedUrls = {}
const possiblePartNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
possiblePartNumbers.forEach((partNumber) => {
parts.forEach(({ number }) => {
presignedUrls[
partNumber
] = `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${partNumber}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test`
number
] = `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${number}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test`
})
return { presignedUrls, headers: { 1: { 'Content-MD5': 'foo' } } }
}),
Expand Down Expand Up @@ -182,6 +181,81 @@ describe('AwsS3Multipart', () => {
{ ETag: 'test', PartNumber: 10 },
])
})

it('Keeps chunks marked as busy through retries until they complete', async () => {
const scope = nock(
'https://bucket.s3.us-east-2.amazonaws.com',
).defaultReplyHeaders({
'access-control-allow-headers': '*',
'access-control-allow-method': 'PUT',
'access-control-allow-origin': '*',
'access-control-expose-headers': 'ETag',
})

const fileSize = 50 * MB

scope
.options((uri) => uri.includes('test/upload/multitest.dat'))
.reply(200, '')
scope
.put((uri) => uri.includes('test/upload/multitest.dat') && !uri.includes('partNumber=7'))
.reply(200, '', { ETag: 'test' })

// Fail the part 7 upload once, then let it succeed
let calls = 0
scope
.put((uri) => uri.includes('test/upload/multitest.dat') && uri.includes('partNumber=7'))
.reply(() => (calls++ === 0 ? [500] : [200, '', { ETag: 'test' }]))

scope.persist()

// Spy on the busy/done state of the test chunk (part 7, chunk index 6)
let busySpy
let doneSpy
awsS3Multipart.setOptions({
createMultipartUpload: jest.fn((file) => {
const multipartUploader = awsS3Multipart.uploaders[file.id]
const testChunkState = multipartUploader.chunkState[6]
let busy = false
let done = false
busySpy = jest.fn((value) => { busy = value })
doneSpy = jest.fn((value) => { done = value })
Object.defineProperty(testChunkState, 'busy', { get: () => busy, set: busySpy })
Object.defineProperty(testChunkState, 'done', { get: () => done, set: doneSpy })

return {
uploadId: '6aeb1980f3fc7ce0b5454d25b71992',
key: 'test/upload/multitest.dat',
}
}),
})

core.addFile({
source: 'jest',
name: 'multitest.dat',
type: 'application/octet-stream',
data: new File([new Uint8Array(fileSize)], {
type: 'application/octet-stream',
}),
})

await core.upload()

// The chunk should be marked as done once
expect(doneSpy.mock.calls.length).toEqual(1)
expect(doneSpy.mock.calls[0][0]).toEqual(true)

// Any changes that set busy to false should only happen after the chunk has been marked done,
// otherwise a race condition occurs (see PR #3955)
const doneCallOrderNumber = doneSpy.mock.invocationCallOrder[0]
for (const [index, callArgs] of busySpy.mock.calls.entries()) {
if (callArgs[0] === false) {
expect(busySpy.mock.invocationCallOrder[index]).toBeGreaterThan(doneCallOrderNumber)
}
}

expect(awsS3Multipart.opts.prepareUploadParts.mock.calls.length).toEqual(3)
})
})

describe('MultipartUploader', () => {
Expand Down