Skip to content

Commit

Permalink
@uppy/aws-s3-multipart: add support for presigned URL batching (#3056)
Browse files Browse the repository at this point in the history
* Add first basic test for aws-s3-multipart

* Add another test for GH actions run

* Add whatwg-fetch to aws-s3-multipart package.json

This is to satisfy linter errors, and the other s3 plugin
does this as well

* Adding more tests

* More test progress

* Tests working with nock

* Add another test, for part uploaded event

* Remove spec and refactor to beforeEach

* S3 multipart batch presign working...in theory

* Working batching and tests

* Min needed for presign batch functionality

* Fix lint issues

* Fix syntax error in companion

* Add companion batch presign endpoint

* Improve tests

* Fix each -> forEach

* Remove .prettierrc

* Adding docs and allow for headers with batch prepare

* Review fixes

* Rename prepareUploadPart to prepareUploadParts and use breaking changes
  to batch presign URLs there.
* Get rid of unnecessary batchPartPresign and minNeededForPresignBatch
  options. Use Math.ceil(limit / 2) for min needed instead.
* Fix up tests.
* Fix up type tests.
* Update documentation.

* Review fixes

* Change _ private methods to use # syntax
* Minor code and docs improvements

* Change Promise.resolve() microtasks to async/await
  • Loading branch information
martin-brennan committed Aug 11, 2021
1 parent 8fb6078 commit d613b84
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 108 deletions.
4 changes: 4 additions & 0 deletions packages/@uppy/aws-s3-multipart/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
"@uppy/companion-client": "file:../companion-client",
"@uppy/utils": "file:../utils"
},
"devDependencies": {
"whatwg-fetch": "3.6.2",
"nock": "^13.1.0"
},
"peerDependencies": {
"@uppy/core": "^1.0.0"
}
Expand Down
180 changes: 105 additions & 75 deletions packages/@uppy/aws-s3-multipart/src/MultipartUploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,17 @@ class MultipartUploader {
// upload was created already. That also ensures that the sequencing is right
// (so the `OP` definitely happens if the upload is created).
//
// This mostly exists to make `_abortUpload` work well: only sending the abort request if
// This mostly exists to make `#abortUpload` work well: only sending the abort request if
// the upload was already created, and if the createMultipartUpload request is still in flight,
// aborting it immediately after it finishes.
this.createdPromise = Promise.reject() // eslint-disable-line prefer-promise-reject-errors
this.isPaused = false
this.partsInProgress = 0
this.chunks = null
this.chunkState = null
this.lockedCandidatesForBatch = []

this._initChunks()
this.#initChunks()

this.createdPromise.catch(() => {}) // silence uncaught rejection warning
}
Expand All @@ -71,11 +72,11 @@ class MultipartUploader {
*
* @returns {boolean}
*/
_aborted () {
#aborted () {
return this.abortController.signal.aborted
}

_initChunks () {
#initChunks () {
const chunks = []
const desiredChunkSize = this.options.getChunkSize(this.file)
// at least 5MB per request, at most 10k requests
Expand All @@ -100,10 +101,10 @@ class MultipartUploader {
}))
}

_createUpload () {
#createUpload () {
this.createdPromise = Promise.resolve().then(() => this.options.createMultipartUpload())
return this.createdPromise.then((result) => {
if (this._aborted()) throw createAbortError()
if (this.#aborted()) throw createAbortError()

const valid = typeof result === 'object' && result
&& typeof result.uploadId === 'string'
Expand All @@ -116,18 +117,19 @@ class MultipartUploader {
this.uploadId = result.uploadId

this.options.onStart(result)
this._uploadParts()
this.#uploadParts()
}).catch((err) => {
this._onError(err)
this.#onError(err)
})
}

_resumeUpload () {
return Promise.resolve().then(() => this.options.listParts({
uploadId: this.uploadId,
key: this.key,
})).then((parts) => {
if (this._aborted()) throw createAbortError()
async #resumeUpload () {
try {
const parts = await this.options.listParts({
uploadId: this.uploadId,
key: this.key,
})
if (this.#aborted()) throw createAbortError()

parts.forEach((part) => {
const i = part.PartNumber - 1
Expand All @@ -146,26 +148,40 @@ class MultipartUploader {
})
}
})
this._uploadParts()
}).catch((err) => {
this._onError(err)
})
this.#uploadParts()
} catch (err) {
this.#onError(err)
}
}

_uploadParts () {
#uploadParts () {
if (this.isPaused) return

const need = this.options.limit - this.partsInProgress
if (need === 0) return

// All parts are uploaded.
if (this.chunkState.every((state) => state.done)) {
this._completeUpload()
this.#completeUpload()
return
}

// For a 100MB file, with the default min chunk size of 5MB and a limit of 10:
//
// Total 20 parts
// ---------
// Need 1 is 10
// Need 2 is 5
// Need 3 is 5
const need = this.options.limit - this.partsInProgress
const completeChunks = this.chunkState.filter((state) => state.done).length
const remainingChunks = this.chunks.length - completeChunks
let minNeeded = Math.ceil(this.options.limit / 2)
if (minNeeded > remainingChunks) {
minNeeded = remainingChunks
}
if (need < minNeeded) return

const candidates = []
for (let i = 0; i < this.chunkState.length; i++) {
if (this.lockedCandidatesForBatch.includes(i)) continue
const state = this.chunkState[i]
if (state.done || state.busy) continue

Expand All @@ -174,18 +190,22 @@ class MultipartUploader {
break
}
}

candidates.forEach((index) => {
this._uploadPartRetryable(index).then(() => {
// Continue uploading parts
this._uploadParts()
}, (err) => {
this._onError(err)
if (candidates.length === 0) return

this.#prepareUploadParts(candidates).then((result) => {
candidates.forEach((index) => {
const partNumber = index + 1
const prePreparedPart = { url: result.presignedUrls[partNumber], headers: result.headers }
this.#uploadPartRetryable(index, prePreparedPart).then(() => {
this.#uploadParts()
}, (err) => {
this.#onError(err)
})
})
})
}

_retryable ({ before, attempt, after }) {
#retryable ({ before, attempt, after }) {
const { retryDelays } = this.options
const { signal } = this.abortController

Expand All @@ -201,7 +221,7 @@ class MultipartUploader {
}

const doAttempt = (retryAttempt) => attempt().catch((err) => {
if (this._aborted()) throw createAbortError()
if (this.#aborted()) throw createAbortError()

if (shouldRetry(err) && retryAttempt < retryDelays.length) {
return delay(retryDelays[retryAttempt], { signal })
Expand All @@ -219,53 +239,62 @@ class MultipartUploader {
})
}

_uploadPartRetryable (index) {
return this._retryable({
async #prepareUploadParts (candidates) {
this.lockedCandidatesForBatch.push(...candidates)

const result = await this.options.prepareUploadParts({
key: this.key,
uploadId: this.uploadId,
partNumbers: candidates.map((index) => index + 1),
})

const valid = typeof result?.presignedUrls === 'object'
if (!valid) {
throw new TypeError(
'AwsS3/Multipart: Got incorrect result from `prepareUploadParts()`, expected an object `{ presignedUrls }`.'
)
}
return result
}

#uploadPartRetryable (index, prePreparedPart) {
return this.#retryable({
before: () => {
this.partsInProgress += 1
},
attempt: () => this._uploadPart(index),
attempt: () => this.#uploadPart(index, prePreparedPart),
after: () => {
this.partsInProgress -= 1
},
})
}

_uploadPart (index) {
#uploadPart (index, prePreparedPart) {
const body = this.chunks[index]
this.chunkState[index].busy = true

return Promise.resolve().then(() => this.options.prepareUploadPart({
key: this.key,
uploadId: this.uploadId,
body,
number: index + 1,
})).then((result) => {
const valid = typeof result === 'object' && result
&& typeof result.url === 'string'
if (!valid) {
throw new TypeError('AwsS3/Multipart: Got incorrect result from `prepareUploadPart()`, expected an object `{ url }`.')
}
const valid = typeof prePreparedPart?.url === 'string'
if (!valid) {
throw new TypeError('AwsS3/Multipart: Got incorrect result for `prePreparedPart`, expected an object `{ url }`.')
}

return result
}).then(({ url, headers }) => {
if (this._aborted()) {
this.chunkState[index].busy = false
throw createAbortError()
}
const { url, headers } = prePreparedPart
if (this.#aborted()) {
this.chunkState[index].busy = false
throw createAbortError()
}

return this._uploadPartBytes(index, url, headers)
})
return this.#uploadPartBytes(index, url, headers)
}

_onPartProgress (index, sent, total) {
#onPartProgress (index, sent, total) {
this.chunkState[index].uploaded = ensureInt(sent)

const totalUploaded = this.chunkState.reduce((n, c) => n + c.uploaded, 0)
this.options.onProgress(totalUploaded, this.file.size)
}

_onPartComplete (index, etag) {
#onPartComplete (index, etag) {
this.chunkState[index].etag = etag
this.chunkState[index].done = true

Expand All @@ -278,7 +307,7 @@ class MultipartUploader {
this.options.onPartComplete(part)
}

_uploadPartBytes (index, url, headers) {
#uploadPartBytes (index, url, headers) {
const body = this.chunks[index]
const { signal } = this.abortController

Expand Down Expand Up @@ -307,7 +336,7 @@ class MultipartUploader {
xhr.upload.addEventListener('progress', (ev) => {
if (!ev.lengthComputable) return

this._onPartProgress(index, ev.loaded, ev.total)
this.#onPartProgress(index, ev.loaded, ev.total)
})

xhr.addEventListener('abort', (ev) => {
Expand All @@ -328,7 +357,7 @@ class MultipartUploader {
return
}

this._onPartProgress(index, body.size, body.size)
this.#onPartProgress(index, body.size, body.size)

// NOTE This must be allowed by CORS.
const etag = ev.target.getResponseHeader('ETag')
Expand All @@ -337,7 +366,7 @@ class MultipartUploader {
return
}

this._onPartComplete(index, etag)
this.#onPartComplete(index, etag)
defer.resolve()
})

Expand All @@ -355,22 +384,23 @@ class MultipartUploader {
return promise
}

_completeUpload () {
async #completeUpload () {
// Parts may not have completed uploading in sorted order, if limit > 1.
this.parts.sort((a, b) => a.PartNumber - b.PartNumber)

return Promise.resolve().then(() => this.options.completeMultipartUpload({
key: this.key,
uploadId: this.uploadId,
parts: this.parts,
})).then((result) => {
try {
const result = await this.options.completeMultipartUpload({
key: this.key,
uploadId: this.uploadId,
parts: this.parts,
})
this.options.onSuccess(result)
}, (err) => {
this._onError(err)
})
} catch (err) {
this.#onError(err)
}
}

_abortUpload () {
#abortUpload () {
this.abortController.abort()

this.createdPromise.then(() => {
Expand All @@ -383,7 +413,7 @@ class MultipartUploader {
})
}

_onError (err) {
#onError (err) {
if (err && err.name === 'AbortError') {
return
}
Expand All @@ -394,9 +424,9 @@ class MultipartUploader {
start () {
this.isPaused = false
if (this.uploadId) {
this._resumeUpload()
this.#resumeUpload()
} else {
this._createUpload()
this.#createUpload()
}
}

Expand All @@ -413,7 +443,7 @@ class MultipartUploader {

if (!really) return this.pause()

this._abortUpload()
this.#abortUpload()
}
}

Expand Down
10 changes: 5 additions & 5 deletions packages/@uppy/aws-s3-multipart/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module.exports = class AwsS3Multipart extends BasePlugin {
retryDelays: [0, 1000, 3000, 5000],
createMultipartUpload: this.createMultipartUpload.bind(this),
listParts: this.listParts.bind(this),
prepareUploadPart: this.prepareUploadPart.bind(this),
prepareUploadParts: this.prepareUploadParts.bind(this),
abortMultipartUpload: this.abortMultipartUpload.bind(this),
completeMultipartUpload: this.completeMultipartUpload.bind(this),
}
Expand Down Expand Up @@ -101,11 +101,11 @@ module.exports = class AwsS3Multipart extends BasePlugin {
.then(assertServerError)
}

prepareUploadPart (file, { key, uploadId, number }) {
this.assertHost('prepareUploadPart')
prepareUploadParts (file, { key, uploadId, partNumbers }) {
this.assertHost('prepareUploadParts')

const filename = encodeURIComponent(key)
return this.client.get(`s3/multipart/${uploadId}/${number}?key=${filename}`)
return this.client.get(`s3/multipart/${uploadId}/batch?key=${filename}?partNumbers=${partNumbers.join(',')}`)
.then(assertServerError)
}

Expand Down Expand Up @@ -191,7 +191,7 @@ module.exports = class AwsS3Multipart extends BasePlugin {
// .bind to pass the file object to each handler.
createMultipartUpload: this.opts.createMultipartUpload.bind(this, file),
listParts: this.opts.listParts.bind(this, file),
prepareUploadPart: this.opts.prepareUploadPart.bind(this, file),
prepareUploadParts: this.opts.prepareUploadParts.bind(this, file),
completeMultipartUpload: this.opts.completeMultipartUpload.bind(this, file),
abortMultipartUpload: this.opts.abortMultipartUpload.bind(this, file),
getChunkSize: this.opts.getChunkSize ? this.opts.getChunkSize.bind(this) : null,
Expand Down

0 comments on commit d613b84

Please sign in to comment.