From 18c27cd7904d7edf2e6b3715b012663f65e511ba Mon Sep 17 00:00:00 2001 From: Jay McDoniel Date: Tue, 21 Jun 2022 16:15:26 -0700 Subject: [PATCH 1/4] fix: use pipeline over stream.pipe `pipeline` ends up destroying streams used if there is an error in one of the streams. Due to this, there's no chance of a memory leak from errored out streams. There's also now an addition of adding an error handler to the `StreamableFile` so that stream errors by default return a 400 and can be customized to return an error message however a developer would like. These only effect the express adapter because of how Fastify already internally handles streams. fix: #9759 --- integration/send-files/e2e/express.spec.ts | 3 +++ integration/send-files/src/app.controller.ts | 5 ++++ integration/send-files/src/app.service.ts | 4 +++ .../common/file-stream/streamable-file.ts | 27 +++++++++++++++++++ .../adapters/express-adapter.ts | 14 +++++++++- 5 files changed, 52 insertions(+), 1 deletion(-) diff --git a/integration/send-files/e2e/express.spec.ts b/integration/send-files/e2e/express.spec.ts index 1657d137c39..f95e923389b 100644 --- a/integration/send-files/e2e/express.spec.ts +++ b/integration/send-files/e2e/express.spec.ts @@ -65,4 +65,7 @@ describe('Express FileSend', () => { expect(res.text).to.be.eq(readmeString); }); }); + it('should return an error if the file does not exist', async () => { + return request(app.getHttpServer()).get('/file/not/exist').expect(400); + }); }); diff --git a/integration/send-files/src/app.controller.ts b/integration/send-files/src/app.controller.ts index 606e2d5980c..9715784bdbf 100644 --- a/integration/send-files/src/app.controller.ts +++ b/integration/send-files/src/app.controller.ts @@ -31,4 +31,9 @@ export class AppController { getFileWithHeaders(): StreamableFile { return this.appService.getFileWithHeaders(); } + + @Get('file/not/exist') + getNonExistantFile(): StreamableFile { + return this.appService.getFileThatDoesNotExist(); + } } diff --git a/integration/send-files/src/app.service.ts b/integration/send-files/src/app.service.ts index ba5849a754f..af94bf5b7f1 100644 --- a/integration/send-files/src/app.service.ts +++ b/integration/send-files/src/app.service.ts @@ -35,4 +35,8 @@ export class AppService { }, ); } + + getFileThatDoesNotExist(): StreamableFile { + return new StreamableFile(createReadStream('does-not-exist.txt')); + } } diff --git a/packages/common/file-stream/streamable-file.ts b/packages/common/file-stream/streamable-file.ts index 7fe05c1d902..f8d79c3af69 100644 --- a/packages/common/file-stream/streamable-file.ts +++ b/packages/common/file-stream/streamable-file.ts @@ -3,9 +3,22 @@ import { types } from 'util'; import { isFunction } from '../utils/shared.utils'; import { StreamableFileOptions } from './streamable-options.interface'; +interface StreamableHandlerResponse { + statusCode: number; + send: (msg: string) => void; +} + export class StreamableFile { private readonly stream: Readable; + private handler: (err: Error, response: StreamableHandlerResponse) => void = ( + err: Error, + res, + ) => { + res.statusCode = 400; + res.send(err.message); + }; + constructor(buffer: Uint8Array, options?: StreamableFileOptions); constructor(readable: Readable, options?: StreamableFileOptions); constructor( @@ -38,4 +51,18 @@ export class StreamableFile { length, }; } + + get errorHandler(): ( + err: Error, + response: StreamableHandlerResponse, + ) => void { + return this.handler; + } + + setErrorHandler( + handler: (err: Error, response: StreamableHandlerResponse) => void, + ) { + this.handler = handler; + return this; + } } diff --git a/packages/platform-express/adapters/express-adapter.ts b/packages/platform-express/adapters/express-adapter.ts index 609e51459ba..23263452401 100644 --- a/packages/platform-express/adapters/express-adapter.ts +++ b/packages/platform-express/adapters/express-adapter.ts @@ -1,5 +1,6 @@ import { InternalServerErrorException, + Logger, RawBodyRequest, RequestMethod, StreamableFile, @@ -33,6 +34,7 @@ import * as cors from 'cors'; import * as express from 'express'; import * as http from 'http'; import * as https from 'https'; +import { PassThrough, pipeline } from 'stream'; import { ServeStaticOptions } from '../interfaces/serve-static-options.interface'; type VersionedRoute = < @@ -78,7 +80,17 @@ export class ExpressAdapter extends AbstractHttpAdapter { ) { response.setHeader('Content-Length', streamHeaders.length); } - return body.getStream().pipe(response); + return pipeline( + body.getStream().on('error', (err: Error) => { + body.errorHandler(err, response); + }), + response, + (err: Error) => { + if (err) { + new Logger('ExpressAdapter').error(err.message, err.stack); + } + }, + ); } return isObject(body) ? response.json(body) : response.send(String(body)); } From f59cf5e81ca73bcdf1b5b36713550fd93918db41 Mon Sep 17 00:00:00 2001 From: Jay McDoniel Date: Sun, 17 Jul 2022 14:56:45 -0700 Subject: [PATCH 2/4] feat: update handler to be protected and only handle errors once --- packages/common/file-stream/streamable-file.ts | 14 ++++++-------- .../platform-express/adapters/express-adapter.ts | 2 +- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/common/file-stream/streamable-file.ts b/packages/common/file-stream/streamable-file.ts index f8d79c3af69..5dd0285ca5f 100644 --- a/packages/common/file-stream/streamable-file.ts +++ b/packages/common/file-stream/streamable-file.ts @@ -3,7 +3,7 @@ import { types } from 'util'; import { isFunction } from '../utils/shared.utils'; import { StreamableFileOptions } from './streamable-options.interface'; -interface StreamableHandlerResponse { +export interface StreamableHandlerResponse { statusCode: number; send: (msg: string) => void; } @@ -11,13 +11,11 @@ interface StreamableHandlerResponse { export class StreamableFile { private readonly stream: Readable; - private handler: (err: Error, response: StreamableHandlerResponse) => void = ( - err: Error, - res, - ) => { - res.statusCode = 400; - res.send(err.message); - }; + protected handler: (err: Error, response: StreamableHandlerResponse) => void = + (err: Error, res) => { + res.statusCode = 400; + res.send(err.message); + }; constructor(buffer: Uint8Array, options?: StreamableFileOptions); constructor(readable: Readable, options?: StreamableFileOptions); diff --git a/packages/platform-express/adapters/express-adapter.ts b/packages/platform-express/adapters/express-adapter.ts index 23263452401..b8606cffbff 100644 --- a/packages/platform-express/adapters/express-adapter.ts +++ b/packages/platform-express/adapters/express-adapter.ts @@ -81,7 +81,7 @@ export class ExpressAdapter extends AbstractHttpAdapter { response.setHeader('Content-Length', streamHeaders.length); } return pipeline( - body.getStream().on('error', (err: Error) => { + body.getStream().once('error', (err: Error) => { body.errorHandler(err, response); }), response, From 248596ba5954003c96a60ca36c6427886eae9bd5 Mon Sep 17 00:00:00 2001 From: Jay McDoniel Date: Sun, 17 Jul 2022 17:38:19 -0700 Subject: [PATCH 3/4] fix: rename handler to to better represent what it is handling --- packages/common/file-stream/streamable-file.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/common/file-stream/streamable-file.ts b/packages/common/file-stream/streamable-file.ts index 5dd0285ca5f..f57b857ee69 100644 --- a/packages/common/file-stream/streamable-file.ts +++ b/packages/common/file-stream/streamable-file.ts @@ -11,11 +11,13 @@ export interface StreamableHandlerResponse { export class StreamableFile { private readonly stream: Readable; - protected handler: (err: Error, response: StreamableHandlerResponse) => void = - (err: Error, res) => { - res.statusCode = 400; - res.send(err.message); - }; + protected handleError: ( + err: Error, + response: StreamableHandlerResponse, + ) => void = (err: Error, res) => { + res.statusCode = 400; + res.send(err.message); + }; constructor(buffer: Uint8Array, options?: StreamableFileOptions); constructor(readable: Readable, options?: StreamableFileOptions); @@ -54,13 +56,13 @@ export class StreamableFile { err: Error, response: StreamableHandlerResponse, ) => void { - return this.handler; + return this.handleError; } setErrorHandler( handler: (err: Error, response: StreamableHandlerResponse) => void, ) { - this.handler = handler; + this.handleError = handler; return this; } } From 9739aa6a1af44e1f76a44366fbbcdad1aaddc392 Mon Sep 17 00:00:00 2001 From: Jay McDoniel Date: Mon, 18 Jul 2022 07:56:45 -0700 Subject: [PATCH 4/4] fix: move logger to class member --- packages/platform-express/adapters/express-adapter.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/platform-express/adapters/express-adapter.ts b/packages/platform-express/adapters/express-adapter.ts index b8606cffbff..605eda024fa 100644 --- a/packages/platform-express/adapters/express-adapter.ts +++ b/packages/platform-express/adapters/express-adapter.ts @@ -48,6 +48,7 @@ type VersionedRoute = < export class ExpressAdapter extends AbstractHttpAdapter { private readonly routerMethodFactory = new RouterMethodFactory(); + private readonly logger = new Logger(ExpressAdapter.name); constructor(instance?: any) { super(instance || express()); @@ -87,7 +88,7 @@ export class ExpressAdapter extends AbstractHttpAdapter { response, (err: Error) => { if (err) { - new Logger('ExpressAdapter').error(err.message, err.stack); + this.logger.error(err.message, err.stack); } }, );