Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leaks and stale connections #148

Merged
merged 6 commits into from Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/green-pens-boil.md
@@ -0,0 +1,9 @@
---
'@whatwg-node/fetch': patch
'@whatwg-node/server': patch
---

- On Node 14, fix the return method of Response.body's AsyncIterator to close HTTP connection correctly
- On Node 14, handle ReadableStream's cancel correctly if Response.body is a ReadableStream
- Do not modify ReadableStream.cancel's behavior but handle it internally
- On Node 18, do not combine Response.body's return and AbortController which causes a memory leak
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -11,7 +11,7 @@
"ci:lint": "eslint --ext .ts . --output-file eslint_report.json --format json",
"prettier": "prettier --ignore-path .gitignore --ignore-path .prettierignore --write --list-different .",
"prettier:check": "prettier --ignore-path .gitignore --ignore-path .prettierignore --check .",
"test": "jest --detectOpenHandles --detectLeaks --logHeapUsage --runInBand",
"test": "jest --detectOpenHandles --detectLeaks",
"prerelease": "yarn build",
"prerelease-canary": "yarn build",
"release": "changeset publish"
Expand Down
95 changes: 40 additions & 55 deletions packages/fetch/dist/create-node-ponyfill.js
@@ -1,4 +1,5 @@
const handleFileRequest = require("./handle-file-request");
const readableStreamToReadable = require("./readableStreamToReadable");

module.exports = function createNodePonyfill(opts = {}) {

Expand Down Expand Up @@ -79,49 +80,6 @@ module.exports = function createNodePonyfill(opts = {}) {
}
}

// ReadableStream doesn't handle aborting properly, so we need to patch it
ponyfills.ReadableStream = class PonyfillReadableStream extends ponyfills.ReadableStream {
constructor(underlyingSource, ...opts) {
super({
...underlyingSource,
cancel: (e) => {
this.cancelled = true;
if (underlyingSource.cancel) {
return underlyingSource.cancel(e);
}
}
}, ...opts);
this.underlyingSource = underlyingSource;
}
[Symbol.asyncIterator]() {
const asyncIterator = super[Symbol.asyncIterator]();
return {
next: (...args) => asyncIterator.next(...args),
throw: (...args) => asyncIterator.throw(...args),
return: async (e) => {
const originalResult = await asyncIterator.return(e);
if (!this.cancelled) {
this.cancelled = true;
if (this.underlyingSource.cancel) {
await this.underlyingSource.cancel(e);
}
}
return originalResult;
}
}
}
async cancel(e) {
const originalResult = !super.locked && await super.cancel(e);
if (!this.cancelled) {
this.cancelled = true;
if (this.underlyingSource.cancel) {
await this.underlyingSource.cancel(e);
}
}
return originalResult;
}
}

if (!ponyfills.crypto) {
const cryptoModule = require("crypto");
ponyfills.crypto = cryptoModule.webcrypto;
Expand Down Expand Up @@ -252,7 +210,7 @@ module.exports = function createNodePonyfill(opts = {}) {
options.body = streams.Readable.from(encoder.encode());
}
if (options.body[Symbol.toStringTag] === 'ReadableStream') {
options.body = streams.Readable.fromWeb ? streams.Readable.fromWeb(options.body) : streams.Readable.from(options.body);
options.body = readableStreamToReadable(options.body);
}
}
super(requestOrUrl, options);
Expand All @@ -270,24 +228,51 @@ module.exports = function createNodePonyfill(opts = {}) {
if (requestOrUrl.url.startsWith('file:')) {
return handleFileRequest(requestOrUrl.url, ponyfills.Response);
}
return realFetch(requestOrUrl);
const abortCtrl = new ponyfills.AbortController();

return realFetch(requestOrUrl, {
...options,
signal: abortCtrl.signal
}).then(res => {
return new Proxy(res, {
get(target, prop, receiver) {
if (prop === 'body') {
return new Proxy(res.body, {
get(target, prop, receiver) {
if (prop === Symbol.asyncIterator) {
return () => {
const originalAsyncIterator = target[Symbol.asyncIterator]();
return {
next() {
return originalAsyncIterator.next();
},
return() {
abortCtrl.abort();
return originalAsyncIterator.return();
},
throw(error) {
abortCtrl.abort(error);
return originalAsyncIterator.throw(error);
}
}
}
}
return Reflect.get(target, prop, receiver);
}
})
}
return Reflect.get(target, prop, receiver);
}
})
});
};

ponyfills.fetch = fetch;

const OriginalResponse = ponyfills.Response || nodeFetch.Response;
ponyfills.Response = function Response(body, init) {
if (body != null && body[Symbol.toStringTag] === 'ReadableStream') {
const actualBody = streams.Readable.fromWeb ? streams.Readable.fromWeb(body) : streams.Readable.from(body, {
emitClose: true,
autoDestroy: true,
});
actualBody.on('pause', () => {
body.cancel();
})
actualBody.on('close', () => {
body.cancel();
})
const actualBody = readableStreamToReadable(body);
// Polyfill ReadableStream is not working well with node-fetch's Response
return new OriginalResponse(actualBody, init);
}
Expand Down
19 changes: 19 additions & 0 deletions packages/fetch/dist/readableStreamToReadable.js
@@ -0,0 +1,19 @@
const streams = require('stream');

module.exports = function readableStreamToReadable(readableStream) {
return streams.Readable.from({
[Symbol.asyncIterator]() {
const reader = readableStream.getReader();
return {
next() {
return reader.read();
},
async return() {
reader.releaseLock();
await readableStream.cancel();
return Promise.resolve({ done: true });
}
}
}
});
}
25 changes: 0 additions & 25 deletions packages/server/src/utils.ts
Expand Up @@ -181,31 +181,6 @@ export async function sendNodeResponse(
body.destroy();
});
body.pipe(serverResponse);
} else if (isReadableStream(body)) {
const reader = body.getReader();
serverResponse.once('close', () => {
reader.cancel().finally(() => {
reader.releaseLock();
body.cancel();
});
});
// eslint-disable-next-line no-inner-declarations
function pump() {
reader
.read()
.then(({ done, value }) => {
if (done) {
serverResponse.end(resolve);
return;
}
serverResponse.write(value, pump);
})
.catch(error => {
console.error(error);
serverResponse.end(resolve);
});
}
pump();
} else if (isAsyncIterable(body)) {
for await (const chunk of body as AsyncIterable<Uint8Array>) {
if (!serverResponse.write(chunk)) {
Expand Down
152 changes: 152 additions & 0 deletions packages/server/test/adapter.fetch.spec.ts
@@ -0,0 +1,152 @@
import { createServerAdapter } from '../src';
import { createTestContainer } from './create-test-container';

describe('adapter.fetch', () => {
createTestContainer(({ Request }) => {
// Request as first parameter
it('should accept Request as a first argument', async () => {
const handleRequest = jest.fn();
const adapter = createServerAdapter(handleRequest, Request);
const request = new Request('http://localhost:8080');
await adapter(request);
expect(handleRequest).toHaveBeenCalledWith(request, expect.anything());
});
it('should accept additional parameters as server context', async () => {
const handleRequest = jest.fn();
const adapter = createServerAdapter<{
foo: string;
}>(handleRequest, Request);
const request = new Request('http://localhost:8080');
const additionalCtx = { foo: 'bar' };
await adapter.fetch(request, additionalCtx);
expect(handleRequest).toHaveBeenCalledWith(request, expect.objectContaining(additionalCtx));
});
// URL as first parameter
it('should accept URL as a first argument', async () => {
const handleRequest = jest.fn();
const adapter = createServerAdapter(handleRequest, Request);
const url = new URL('http://localhost:8080');
await adapter.fetch(url);
expect(handleRequest).toHaveBeenCalledWith(
expect.objectContaining({
url: url.toString(),
}),
expect.anything()
);
});
it('should accept URL without a RequestInit but with an additional context', async () => {
const handleRequest = jest.fn();
const adapter = createServerAdapter<{
foo: string;
}>(handleRequest, Request);
const url = new URL('http://localhost:8080');
const additionalCtx = { foo: 'bar' };
await adapter.fetch(url, additionalCtx);
expect(handleRequest).toHaveBeenCalledWith(
expect.objectContaining({
url: url.toString(),
}),
expect.objectContaining(additionalCtx)
);
});
it('should accept URL with a RequestInit', async () => {
const handleRequest = jest.fn();
const adapter = createServerAdapter(handleRequest, Request);
const url = new URL('http://localhost:8080');
const init = {
method: 'POST',
};
await adapter.fetch(url, init);
expect(handleRequest).toHaveBeenCalledWith(
expect.objectContaining({
url: url.toString(),
method: init.method,
}),
expect.anything()
);
});
it('should accept URL with a RequestInit and additional parameters as server context', async () => {
const handleRequest = jest.fn();
const adapter = createServerAdapter<{
foo: string;
}>(handleRequest, Request);
const url = new URL('http://localhost:8080');
const init = {
method: 'POST',
};
const additionalCtx = { foo: 'bar' };
await adapter.fetch(url, init, additionalCtx);
expect(handleRequest).toHaveBeenCalledWith(
expect.objectContaining({
url: url.toString(),
method: init.method,
}),
expect.objectContaining(additionalCtx)
);
});

// String as first parameter
it('should accept string as a first argument', async () => {
const handleRequest = jest.fn();
const adapter = createServerAdapter(handleRequest, Request);
const url = 'http://localhost:8080/';
await adapter.fetch(url);
expect(handleRequest).toHaveBeenCalledWith(
expect.objectContaining({
url,
}),
expect.anything()
);
});
it('should accept string without a RequestInit but with an additional context', async () => {
const handleRequest = jest.fn();
const adapter = createServerAdapter<{
foo: string;
}>(handleRequest, Request);
const url = 'http://localhost:8080/';
const additionalCtx = { foo: 'bar' };
await adapter.fetch(url, additionalCtx);
expect(handleRequest).toHaveBeenCalledWith(
expect.objectContaining({
url,
}),
expect.objectContaining(additionalCtx)
);
});
it('should accept string with a RequestInit', async () => {
const handleRequest = jest.fn();
const adapter = createServerAdapter(handleRequest, Request);
const url = 'http://localhost:8080/';
const init = {
method: 'POST',
};
await adapter.fetch(url, init);
expect(handleRequest).toHaveBeenCalledWith(
expect.objectContaining({
url,
method: init.method,
}),
expect.anything()
);
});
it('should accept string with a RequestInit and additional parameters as server context', async () => {
const handleRequest = jest.fn();
const adapter = createServerAdapter<{
foo: string;
}>(handleRequest, Request);
const url = 'http://localhost:8080/';
const init = {
method: 'POST',
};
const additionalCtx = { foo: 'bar' };
await adapter.fetch(url, init, additionalCtx);
expect(handleRequest).toHaveBeenCalledWith(
expect.objectContaining({
url,
method: init.method,
}),
expect.objectContaining(additionalCtx)
);
});
});
});
16 changes: 16 additions & 0 deletions packages/server/test/create-test-container.ts
@@ -0,0 +1,16 @@
import { createFetch } from '@whatwg-node/fetch';

export function createTestContainer(
fn: (fetchAPI: ReturnType<typeof createFetch>) => void,
extraFlags: Parameters<typeof createFetch>[0] = {}
) {
['default-fetch' /*, 'node-fetch' */].forEach(fetchImplementation => {
describe(fetchImplementation, () => {
const fetchAPI = createFetch({
useNodeFetch: fetchImplementation === 'node-fetch',
...extraFlags,
});
fn(fetchAPI);
});
});
}