Skip to content

Commit

Permalink
feat(lib-storage): improve performance by reducing buffer copies (#5078)
Browse files Browse the repository at this point in the history
* feat(lib-storage): improve performance by reducing buffer copies

* test(lib-storage): add e2e tests
  • Loading branch information
kuhe committed Mar 12, 2024
1 parent 93f81c4 commit e2fb9d5
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 222 deletions.
5 changes: 5 additions & 0 deletions lib/lib-storage/jest.config.e2e.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = {
preset: "ts-jest",
testMatch: ["**/*.e2e.spec.ts"],
bail: true,
};
3 changes: 2 additions & 1 deletion lib/lib-storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"build:types:downlevel": "downlevel-dts dist-types dist-types/ts3.4",
"clean": "rimraf ./dist-* && rimraf *.tsbuildinfo",
"extract:docs": "api-extractor run --local",
"test": "jest"
"test": "jest",
"test:e2e": "jest -c jest.config.e2e.js"
},
"engines": {
"node": ">=14.0.0"
Expand Down
243 changes: 122 additions & 121 deletions lib/lib-storage/src/Upload.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,135 +312,136 @@ describe(Upload.name, () => {
expect(result.Location).toEqual("https://example-bucket.example-host.com/folder/example-key");
});

it("should upload using multi-part when parts are larger than part size", async () => {
// create a string that's larger than 5MB.
const partSize = 1024 * 1024 * 5;
const largeBuffer = Buffer.from("#".repeat(partSize + 10));
const firstBuffer = largeBuffer.subarray(0, partSize);
const secondBuffer = largeBuffer.subarray(partSize);
const actionParams = { ...params, Body: largeBuffer };
const upload = new Upload({
params: actionParams,
client: new S3({}),
});
await upload.done();
expect(sendMock).toHaveBeenCalledTimes(4);
// create multipartMock is called correctly.
expect(createMultipartMock).toHaveBeenCalledTimes(1);
expect(createMultipartMock).toHaveBeenCalledWith({
...actionParams,
Body: undefined,
});
// upload parts is called correctly.
expect(uploadPartMock).toHaveBeenCalledTimes(2);
expect(uploadPartMock).toHaveBeenNthCalledWith(1, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(firstBuffer),
PartNumber: 1,
UploadId: "mockuploadId",
});
expect(uploadPartMock).toHaveBeenNthCalledWith(2, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(secondBuffer),
PartNumber: 2,
UploadId: "mockuploadId",
});
// complete multipart upload is called correctly.
expect(completeMultipartMock).toHaveBeenCalledTimes(1);
expect(completeMultipartMock).toHaveBeenLastCalledWith({
...actionParams,
Body: undefined,
UploadId: "mockuploadId",
MultipartUpload: {
Parts: [
{
ETag: "mock-upload-Etag",
PartNumber: 1,
},
{
ETag: "mock-upload-Etag-2",
PartNumber: 2,
},
],
},
});
[
{ type: "buffer", largeBuffer: Buffer.from("#".repeat(DEFAULT_PART_SIZE + 10)) },
{ type: "Uint8array", largeBuffer: Uint8Array.from(Buffer.from("#".repeat(DEFAULT_PART_SIZE + 10))) },
].forEach(({ type, largeBuffer }) => {
it(`should upload using multi-part when parts are larger than part size ${type}`, async () => {
const firstBuffer = largeBuffer.subarray(0, DEFAULT_PART_SIZE);
const secondBuffer = largeBuffer.subarray(DEFAULT_PART_SIZE);
const actionParams = { ...params, Body: largeBuffer };
const upload = new Upload({
params: actionParams,
client: new S3({}),
});
await upload.done();
expect(sendMock).toHaveBeenCalledTimes(4);
// create multipartMock is called correctly.
expect(createMultipartMock).toHaveBeenCalledTimes(1);
expect(createMultipartMock).toHaveBeenCalledWith({
...actionParams,
Body: undefined,
});
// upload parts is called correctly.
expect(uploadPartMock).toHaveBeenCalledTimes(2);
expect(uploadPartMock).toHaveBeenNthCalledWith(1, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(firstBuffer),
PartNumber: 1,
UploadId: "mockuploadId",
});
expect(uploadPartMock).toHaveBeenNthCalledWith(2, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(secondBuffer),
PartNumber: 2,
UploadId: "mockuploadId",
});
// complete multipart upload is called correctly.
expect(completeMultipartMock).toHaveBeenCalledTimes(1);
expect(completeMultipartMock).toHaveBeenLastCalledWith({
...actionParams,
Body: undefined,
UploadId: "mockuploadId",
MultipartUpload: {
Parts: [
{
ETag: "mock-upload-Etag",
PartNumber: 1,
},
{
ETag: "mock-upload-Etag-2",
PartNumber: 2,
},
],
},
});

// no tags were passed.
expect(putObjectTaggingMock).toHaveBeenCalledTimes(0);
// put was not called
expect(putObjectMock).toHaveBeenCalledTimes(0);
});
// no tags were passed.
expect(putObjectTaggingMock).toHaveBeenCalledTimes(0);
// put was not called
expect(putObjectMock).toHaveBeenCalledTimes(0);
});

it("should upload using multi-part when parts are larger than part size stream", async () => {
// create a string that's larger than 5MB.
const firstBuffer = largeBuffer.subarray(0, DEFAULT_PART_SIZE);
const secondBuffer = largeBuffer.subarray(DEFAULT_PART_SIZE);
const streamBody = Readable.from(
(function* () {
yield largeBuffer;
})()
);
const actionParams = { ...params, Body: streamBody };
const upload = new Upload({
params: actionParams,
client: new S3({}),
});

it("should upload using multi-part when parts are larger than part size stream", async () => {
// create a string that's larger than 5MB.
const largeBuffer = Buffer.from("#".repeat(DEFAULT_PART_SIZE + 10));
const firstBuffer = largeBuffer.subarray(0, DEFAULT_PART_SIZE);
const secondBuffer = largeBuffer.subarray(DEFAULT_PART_SIZE);
const streamBody = Readable.from(
(function* () {
yield largeBuffer;
})()
);
const actionParams = { ...params, Body: streamBody };
const upload = new Upload({
params: actionParams,
client: new S3({}),
});
await upload.done();

await upload.done();
expect(sendMock).toHaveBeenCalledTimes(4);
// create multipartMock is called correctly.
expect(createMultipartMock).toHaveBeenCalledTimes(1);
expect(createMultipartMock).toHaveBeenCalledWith({
...actionParams,
Body: undefined,
});

expect(sendMock).toHaveBeenCalledTimes(4);
// create multipartMock is called correctly.
expect(createMultipartMock).toHaveBeenCalledTimes(1);
expect(createMultipartMock).toHaveBeenCalledWith({
...actionParams,
Body: undefined,
});
// upload parts is called correctly.
expect(uploadPartMock).toHaveBeenCalledTimes(2);
expect(uploadPartMock).toHaveBeenNthCalledWith(1, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(firstBuffer),
PartNumber: 1,
UploadId: "mockuploadId",
});

// upload parts is called correctly.
expect(uploadPartMock).toHaveBeenCalledTimes(2);
expect(uploadPartMock).toHaveBeenNthCalledWith(1, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(firstBuffer),
PartNumber: 1,
UploadId: "mockuploadId",
});
expect(uploadPartMock).toHaveBeenNthCalledWith(2, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(secondBuffer),
PartNumber: 2,
UploadId: "mockuploadId",
});

expect(uploadPartMock).toHaveBeenNthCalledWith(2, {
...actionParams,
// @ts-ignore extended custom matcher
Body: expect.toHaveSameHashAsBuffer(secondBuffer),
PartNumber: 2,
UploadId: "mockuploadId",
});
// complete multipart upload is called correctly.
expect(completeMultipartMock).toHaveBeenCalledTimes(1);
expect(completeMultipartMock).toHaveBeenLastCalledWith({
...actionParams,
Body: undefined,
UploadId: "mockuploadId",
MultipartUpload: {
Parts: [
{
ETag: "mock-upload-Etag",
PartNumber: 1,
},
{
ETag: "mock-upload-Etag-2",
PartNumber: 2,
},
],
},
});

// complete multipart upload is called correctly.
expect(completeMultipartMock).toHaveBeenCalledTimes(1);
expect(completeMultipartMock).toHaveBeenLastCalledWith({
...actionParams,
Body: undefined,
UploadId: "mockuploadId",
MultipartUpload: {
Parts: [
{
ETag: "mock-upload-Etag",
PartNumber: 1,
},
{
ETag: "mock-upload-Etag-2",
PartNumber: 2,
},
],
},
// no tags were passed.
expect(putObjectTaggingMock).toHaveBeenCalledTimes(0);
// put was not called
expect(putObjectMock).toHaveBeenCalledTimes(0);
});

// no tags were passed.
expect(putObjectTaggingMock).toHaveBeenCalledTimes(0);
// put was not called
expect(putObjectMock).toHaveBeenCalledTimes(0);
});

it("should add tags to the object if tags have been added PUT", async () => {
Expand Down
8 changes: 7 additions & 1 deletion lib/lib-storage/src/bytelength.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { Buffer } from "buffer"; // do not remove this import: Node.js buffer or buffer NPM module for browser.

import { ClientDefaultValues } from "./runtimeConfig";

export const byteLength = (input: any) => {
if (input === null || input === undefined) return 0;
if (typeof input === "string") input = Buffer.from(input);

if (typeof input === "string") {
return Buffer.byteLength(input);
}

if (typeof input.byteLength === "number") {
return input.byteLength;
} else if (typeof input.length === "number") {
Expand Down
36 changes: 22 additions & 14 deletions lib/lib-storage/src/chunker.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
import { Buffer } from "buffer";
import { Buffer } from "buffer"; // do not remove this import: Node.js buffer or buffer NPM module for browser.
import { Readable } from "stream";

import { getChunkBuffer } from "./chunks/getChunkBuffer";
import { getChunkStream } from "./chunks/getChunkStream";
import { getChunkUint8Array } from "./chunks/getChunkUint8Array";
import { getDataReadable } from "./chunks/getDataReadable";
import { getDataReadableStream } from "./chunks/getDataReadableStream";
import { BodyDataTypes } from "./types";
import type { RawDataPart } from "./Upload";

export const getChunk = (data: BodyDataTypes, partSize: number) => {
if (data instanceof Buffer) {
return getChunkBuffer(data, partSize);
} else if (data instanceof Readable) {
export const getChunk = (data: BodyDataTypes, partSize: number): AsyncGenerator<RawDataPart, void, undefined> => {
if (data instanceof Uint8Array) {
// includes Buffer (extends Uint8Array)
return getChunkUint8Array(data, partSize);
}

if (data instanceof Readable) {
return getChunkStream<Readable>(data, partSize, getDataReadable);
} else if (data instanceof String || typeof data === "string" || data instanceof Uint8Array) {
// chunk Strings, Uint8Array.
return getChunkBuffer(Buffer.from(data), partSize);
}

if (data instanceof String || typeof data === "string") {
return getChunkUint8Array(Buffer.from(data), partSize);
}

if (typeof (data as any).stream === "function") {
// approximate support for Blobs.
return getChunkStream<ReadableStream>((data as any).stream(), partSize, getDataReadableStream);
} else if (data instanceof ReadableStream) {
}

if (data instanceof ReadableStream) {
return getChunkStream<ReadableStream>(data, partSize, getDataReadableStream);
} else {
throw new Error(
"Body Data is unsupported format, expected data to be one of: string | Uint8Array | Buffer | Readable | ReadableStream | Blob;."
);
}

throw new Error(
"Body Data is unsupported format, expected data to be one of: string | Uint8Array | Buffer | Readable | ReadableStream | Blob;."
);
};

0 comments on commit e2fb9d5

Please sign in to comment.