-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
@tus/server: add GCS locker #616
base: main
Are you sure you want to change the base?
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting in the time to contribute!
I'm not an expert on (distributed) locking, but conceptually I think GCS storage as a locker only makes sense if you're already deploying your server within GCS infrastructure (so it's faster) and you have a bucket in the region where the uploads happen. My assumption is if those conditions aren't met, things will be slow? AFAIK GCS has strong consistency within the same region but eventual consistency for multi-region.
Maybe you can elaborate on your use case?
Indeed I haven't even thought about using this locker with a store other than GCS. In my case, the storage bucket and the locker bucket is the same, and I think the only case they should be separated is when the storage bucket is not in standard storage class. Anyways, I'm not sure i.e. Firestore would greatly overperform GCS in case of different storage. Regarding region latency, the user should be aware of that and choose a suitable region. Of course a redis based implementation would be much better, but this may be a considerable alternative until thats not implemented. Shall I move this locker to the gcs-store package to suggest the primary application? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting because such approaches would allow tus server to implement lockers directly on top of cloud storages instead of using external tools like Redis. However, I would like to see some evidence that this approach actually provides exclusive access to uploads. Is there some blog post that looked into the mechanisms at play here? Are all involved operations strongly consistent?
GCS is strongly consistent, but indeed concurrency was not ensured in my previous approach. I have reworked the code based on this article. Note that I had to upgrade @google-cloud/storage because previous version was missing a type export. Also, this feature should be moved to a separate package or into gcs-store, as I'm importing from @google-cloud/storage. |
Really nice article, thanks for sharing. It does also say this:
But here we are using it for individual uploads, not batches. Or even smaller with a resumed uploads (or where a client sets |
For the last 10 days it has been running in production without problems. We have about 5000 uploads per day. In e2e tests it was indeed slightly slower for 140 files compared to xhr, but I could easily compensate this by increasing the number of parallel uploads. If I measure individual uploads, the time elapsed between lock and unlock is mostly 20-400 ms in case of memory locker, and 300-400 for gcs locker. |
That's great to hear! I'm in favor adding this into the package then. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks very good! Also happy with the extensive code comments.
Some things needed:
- The build is currently failing
- We need to update the
peerDependencies
to not allow any version of@google-cloud/storage
. - Docs. We should also talk about when to (not) use this lock and the things to watch out for, such as what values to set for the ttl and watch interval.
- A test similar to this:
tus-node-server/test/e2e.test.ts
Lines 1045 to 1145 in a0f9da1
describe('File Store with Locking', () => { before(() => { server = new Server({ path: STORE_PATH, datastore: new FileStore({directory: `./${STORE_PATH}`}), locker: new MemoryLocker(), }) listener = server.listen() agent = request.agent(listener) }) after((done) => { // Remove the files directory rimraf(FILES_DIRECTORY, (err) => { if (err) { return done(err) } // Clear the config // @ts-expect-error we can consider a generic to pass to // datastore to narrow down the store type const uploads = (server.datastore.configstore as Configstore).list?.() ?? [] for (const upload in uploads) { // @ts-expect-error we can consider a generic to pass to // datastore to narrow down the store type await(server.datastore.configstore as Configstore).delete(upload) } listener.close() return done() }) }) it('will allow another request to acquire the lock by cancelling the previous request', async () => { const res = await agent .post(STORE_PATH) .set('Tus-Resumable', TUS_RESUMABLE) .set('Upload-Length', TEST_FILE_SIZE) .set('Upload-Metadata', TEST_METADATA) .set('Tus-Resumable', TUS_RESUMABLE) .expect(201) assert.equal('location' in res.headers, true) assert.equal(res.headers['tus-resumable'], TUS_RESUMABLE) // Save the id for subsequent tests const file_id = res.headers.location.split('/').pop() const file_size = parseInt(TEST_FILE_SIZE, 10) // Slow down writing const originalWrite = server.datastore.write.bind(server.datastore) sinon.stub(server.datastore, 'write').callsFake((stream, ...args) => { const throttleStream = new Throttle({bps: file_size / 4}) return originalWrite(stream.pipe(throttleStream), ...args) }) const data = Buffer.alloc(parseInt(TEST_FILE_SIZE, 10), 'a') const httpAgent = new Agent({ maxSockets: 2, maxFreeSockets: 10, timeout: 10000, keepAlive: true, }) const createPatchReq = (offset: number) => { return agent .patch(`${STORE_PATH}/${file_id}`) .agent(httpAgent) .set('Tus-Resumable', TUS_RESUMABLE) .set('Upload-Offset', offset.toString()) .set('Content-Type', 'application/offset+octet-stream') .send(data.subarray(offset)) } const req1 = createPatchReq(0).then((e) => e) await wait(100) const req2 = agent .head(`${STORE_PATH}/${file_id}`) .agent(httpAgent) .set('Tus-Resumable', TUS_RESUMABLE) .expect(200) .then((e) => e) const [res1, res2] = await Promise.allSettled([req1, req2]) assert.equal(res1.status, 'fulfilled') assert.equal(res2.status, 'fulfilled') assert.equal(res1.value.statusCode, 400) assert.equal(res1.value.headers['upload-offset'] !== TEST_FILE_SIZE, true) assert.equal(res2.value.statusCode, 200) // Verify that we are able to resume even if the first request // was cancelled by the second request trying to acquire the lock const offset = parseInt(res2.value.headers['upload-offset'], 10) const finishedUpload = await createPatchReq(offset) assert.equal(finishedUpload.statusCode, 204) assert.equal(finishedUpload.headers['upload-offset'], TEST_FILE_SIZE) }).timeout(20000) }) })
If you need help with any of these let me know.
* Fetch metadata of the lock file. | ||
*/ | ||
public async getMeta() { | ||
return (await this.lockFile.getMetadata())[0] as GCSLockFileMetadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure this can never return undefined
or something else?
await this.deleteReleaseRequest() | ||
await this.lockFile.delete({ifGenerationMatch: this.currentMetaGeneration}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to do this in parallel?
Thank you for the article, I will have a look at it! I am wondering if S3 has similar capabilities and a locker can be implemented nowadays ontop of it as well. |
This PR is not complete yet, it misses unit tests (the code is tested), readme updates and changeset. Despite all that, I would like to ask you to review my approach first so I won't write needless tests. I have documented the process in detail, but feel free to ask questions.