Skip to content

Commit

Permalink
[node-bridge][build-utils] Add multi payload lambda handling in node-…
Browse files Browse the repository at this point in the history
…bridge (#7507)

### Related Issues

x-ref: vercel/next.js#34935

### 📋 Checklist

<!--
  Please keep your PR as a Draft until the checklist is complete
-->

#### Tests

- [ ] The code changed/added as part of this PR has been covered with tests
- [ ] All tests pass locally with `yarn test-unit`

#### Code Review

- [ ] This PR has a concise title and thorough description useful to a reviewer
- [ ] Issue from task tracker has a link to this PR
  • Loading branch information
ijjk committed Mar 6, 2022
1 parent 1e54d60 commit 6b2a1c3
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 12 deletions.
11 changes: 11 additions & 0 deletions packages/build-utils/src/lambda.ts
Expand Up @@ -22,6 +22,7 @@ export interface LambdaOptionsBase {
environment?: Environment;
allowQuery?: string[];
regions?: string[];
supportsMultiPayloads?: boolean;
}

export interface LambdaOptionsWithFiles extends LambdaOptionsBase {
Expand Down Expand Up @@ -57,6 +58,7 @@ export class Lambda {
* @deprecated Use `await lambda.createZip()` instead.
*/
zipBuffer?: Buffer;
supportsMultiPayloads?: boolean;

constructor(opts: LambdaOptions) {
const {
Expand All @@ -67,6 +69,7 @@ export class Lambda {
environment = {},
allowQuery,
regions,
supportsMultiPayloads,
} = opts;
if ('files' in opts) {
assert(typeof opts.files === 'object', '"files" must be an object');
Expand Down Expand Up @@ -94,6 +97,13 @@ export class Lambda {
);
}

if (supportsMultiPayloads !== undefined) {
assert(
typeof supportsMultiPayloads === 'boolean',
'"supportsMultiPayloads" is not a boolean'
);
}

if (regions !== undefined) {
assert(Array.isArray(regions), '"regions" is not an Array');
assert(
Expand All @@ -111,6 +121,7 @@ export class Lambda {
this.allowQuery = allowQuery;
this.regions = regions;
this.zipBuffer = 'zipBuffer' in opts ? opts.zipBuffer : undefined;
this.supportsMultiPayloads = supportsMultiPayloads;
}

async createZip(): Promise<Buffer> {
Expand Down
114 changes: 102 additions & 12 deletions packages/node-bridge/bridge.js
Expand Up @@ -17,21 +17,49 @@ process.on('unhandledRejection', err => {
*/
function normalizeProxyEvent(event) {
let bodyBuffer;
const { method, path, headers, encoding, body } = JSON.parse(event.body);
const { method, path, headers, encoding, body, payloads } = JSON.parse(
event.body
);

if (body) {
if (encoding === 'base64') {
bodyBuffer = Buffer.from(body, encoding);
} else if (encoding === undefined) {
bodyBuffer = Buffer.from(body);
/**
*
* @param {string | Buffer} b
* @returns Buffer
*/
const normalizeBody = b => {
if (b) {
if (encoding === 'base64') {
bodyBuffer = Buffer.from(b, encoding);
} else if (encoding === undefined) {
bodyBuffer = Buffer.from(b);
} else {
throw new Error(`Unsupported encoding: ${encoding}`);
}
} else {
throw new Error(`Unsupported encoding: ${encoding}`);
bodyBuffer = Buffer.alloc(0);
}
} else {
bodyBuffer = Buffer.alloc(0);
return bodyBuffer;
};

if (payloads) {
/**
* @param {{ body: string | Buffer }} payload
*/
const normalizePayload = payload => {
payload.body = normalizeBody(payload.body);
};
payloads.forEach(normalizePayload);
}
bodyBuffer = normalizeBody(body);

return { isApiGateway: false, method, path, headers, body: bodyBuffer };
return {
isApiGateway: false,
method,
path,
headers,
body: bodyBuffer,
payloads,
};
}

/**
Expand Down Expand Up @@ -152,9 +180,71 @@ class Bridge {
*/
async launcher(event, context) {
context.callbackWaitsForEmptyEventLoop = false;
const { port } = await this.listening;

const normalizedEvent = normalizeEvent(event);

if (
'payloads' in normalizedEvent &&
Array.isArray(normalizedEvent.payloads)
) {
// statusCode and headers are required to match when using
// multiple payloads in a single invocation so we can use
// the first
let statusCode = 200;
/**
* @type {import('http').IncomingHttpHeaders}
*/
let headers = {};
/**
* @type {string}
*/
let combinedBody = '';
const multipartBoundary = 'payload-separator';
const CLRF = '\r\n';

// we execute the payloads one at a time to ensure
// lambda semantics
for (let i = 0; i < normalizedEvent.payloads.length; i++) {
const currentPayload = normalizedEvent.payloads[i];
const response = await this.handleEvent(currentPayload);
// build a combined body using multipart
// https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
combinedBody += `--${multipartBoundary}${CLRF}`;
if (response.headers['content-type']) {
combinedBody += `content-type: ${response.headers['content-type']}${CLRF}${CLRF}`;
}
combinedBody += response.body;
combinedBody += CLRF;

if (i === normalizedEvent.payloads.length - 1) {
combinedBody += `--${multipartBoundary}--${CLRF}`;
}

statusCode = response.statusCode;
headers = response.headers;
}

headers[
'content-type'
] = `multipart/mixed; boundary="${multipartBoundary}"`;

return {
headers,
statusCode,
body: combinedBody,
encoding: 'base64',
};
} else {
return this.handleEvent(normalizedEvent);
}
}

/**
*
* @param {ReturnType<typeof normalizeEvent>} normalizedEvent
* @return {Promise<{statusCode: number, headers: import('http').IncomingHttpHeaders, body: string, encoding: 'base64'}>}
*/
async handleEvent(normalizedEvent) {
const { port } = await this.listening;
const { isApiGateway, method, headers, body } = normalizedEvent;
let { path } = normalizedEvent;

Expand Down
72 changes: 72 additions & 0 deletions packages/node-bridge/test/bridge.test.js
Expand Up @@ -84,6 +84,78 @@ test('`NowProxyEvent` normalizing', async () => {
server.close();
});

test('multi-payload handling', async () => {
const server = new Server((req, res) => {
res.setHeader(
'content-type',
req.url.includes('_next/data') ? 'application/json' : 'text/html'
);

res.end(
JSON.stringify({
method: req.method,
path: req.url,
headers: req.headers,
})
);
});
const bridge = new Bridge(server);
bridge.listen();
const context = { callbackWaitsForEmptyEventLoop: true };
const result = await bridge.launcher(
{
Action: 'Invoke',
body: JSON.stringify({
payloads: [
{
method: 'GET',
headers: { foo: 'baz' },
path: '/nowproxy',
},
{
method: 'GET',
headers: { foo: 'baz' },
path: '/_next/data/build-id/nowproxy.json',
},
],
}),
},
context
);
assert.equal(result.encoding, 'base64');
assert.equal(result.statusCode, 200);
assert.equal(
result.headers['content-type'],
'multipart/mixed; boundary="payload-separator"'
);
const bodies = [];
const payloadParts = result.body.split('\r\n');

payloadParts.forEach(item => {
if (
item.trim() &&
!item.startsWith('content-type:') &&
!item.startsWith('--payload')
) {
bodies.push(
JSON.parse(
Buffer.from(item.split('--payload-separator')[0], 'base64').toString()
)
);
}
});

assert.equal(bodies[0].method, 'GET');
assert.equal(bodies[0].path, '/nowproxy');
assert.equal(bodies[0].headers.foo, 'baz');
assert.equal(bodies[1].method, 'GET');
assert.equal(bodies[1].path, '/_next/data/build-id/nowproxy.json');
assert.equal(bodies[1].headers.foo, 'baz');
assert.equal(context.callbackWaitsForEmptyEventLoop, false);

server.close();
});

test('consumeEvent', async () => {
const mockListener = jest.fn((req, res) => {
res.end('hello');
Expand Down
1 change: 1 addition & 0 deletions packages/node-bridge/types.ts
Expand Up @@ -16,6 +16,7 @@ export interface VercelProxyRequest {
path: string;
headers: IncomingHttpHeaders;
body: Buffer;
payloads?: Array<VercelProxyRequest>;
}
export interface VercelProxyResponse {
statusCode: number;
Expand Down

0 comments on commit 6b2a1c3

Please sign in to comment.