Skip to content

Commit

Permalink
Merge pull request #9819 from jmcdo29/fix/pipeline-over-pipe
Browse files Browse the repository at this point in the history
fix: use pipeline over stream.pipe
  • Loading branch information
kamilmysliwiec committed Jul 20, 2022
2 parents 4eacd87 + 9739aa6 commit 429dfa1
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 1 deletion.
3 changes: 3 additions & 0 deletions integration/send-files/e2e/express.spec.ts
Expand Up @@ -65,4 +65,7 @@ describe('Express FileSend', () => {
expect(res.text).to.be.eq(readmeString);
});
});
it('should return an error if the file does not exist', async () => {
return request(app.getHttpServer()).get('/file/not/exist').expect(400);
});
});
5 changes: 5 additions & 0 deletions integration/send-files/src/app.controller.ts
Expand Up @@ -31,4 +31,9 @@ export class AppController {
getFileWithHeaders(): StreamableFile {
return this.appService.getFileWithHeaders();
}

@Get('file/not/exist')
getNonExistantFile(): StreamableFile {
return this.appService.getFileThatDoesNotExist();
}
}
4 changes: 4 additions & 0 deletions integration/send-files/src/app.service.ts
Expand Up @@ -35,4 +35,8 @@ export class AppService {
},
);
}

getFileThatDoesNotExist(): StreamableFile {
return new StreamableFile(createReadStream('does-not-exist.txt'));
}
}
27 changes: 27 additions & 0 deletions packages/common/file-stream/streamable-file.ts
Expand Up @@ -3,9 +3,22 @@ import { types } from 'util';
import { isFunction } from '../utils/shared.utils';
import { StreamableFileOptions } from './streamable-options.interface';

export interface StreamableHandlerResponse {
statusCode: number;
send: (msg: string) => void;
}

export class StreamableFile {
private readonly stream: Readable;

protected handleError: (
err: Error,
response: StreamableHandlerResponse,
) => void = (err: Error, res) => {
res.statusCode = 400;
res.send(err.message);
};

constructor(buffer: Uint8Array, options?: StreamableFileOptions);
constructor(readable: Readable, options?: StreamableFileOptions);
constructor(
Expand Down Expand Up @@ -38,4 +51,18 @@ export class StreamableFile {
length,
};
}

get errorHandler(): (
err: Error,
response: StreamableHandlerResponse,
) => void {
return this.handleError;
}

setErrorHandler(
handler: (err: Error, response: StreamableHandlerResponse) => void,
) {
this.handleError = handler;
return this;
}
}
15 changes: 14 additions & 1 deletion packages/platform-express/adapters/express-adapter.ts
@@ -1,5 +1,6 @@
import {
InternalServerErrorException,
Logger,
RawBodyRequest,
RequestMethod,
StreamableFile,
Expand Down Expand Up @@ -32,6 +33,7 @@ import * as cors from 'cors';
import * as express from 'express';
import * as http from 'http';
import * as https from 'https';
import { PassThrough, pipeline } from 'stream';
import { ServeStaticOptions } from '../interfaces/serve-static-options.interface';
import { getBodyParserOptions } from './utils/get-body-parser-options.util';

Expand All @@ -46,6 +48,7 @@ type VersionedRoute = <

export class ExpressAdapter extends AbstractHttpAdapter {
private readonly routerMethodFactory = new RouterMethodFactory();
private readonly logger = new Logger(ExpressAdapter.name);

constructor(instance?: any) {
super(instance || express());
Expand Down Expand Up @@ -78,7 +81,17 @@ export class ExpressAdapter extends AbstractHttpAdapter {
) {
response.setHeader('Content-Length', streamHeaders.length);
}
return body.getStream().pipe(response);
return pipeline(
body.getStream().once('error', (err: Error) => {
body.errorHandler(err, response);
}),
response,
(err: Error) => {
if (err) {
this.logger.error(err.message, err.stack);
}
},
);
}
return isObject(body) ? response.json(body) : response.send(String(body));
}
Expand Down

0 comments on commit 429dfa1

Please sign in to comment.