Skip to content

Commit

Permalink
test(core): add a sse failed test to reproduce nestjs#11601
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengjitf committed Apr 16, 2024
1 parent b483f87 commit af87b2a
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions packages/core/test/router/router-response-controller.spec.ts
Expand Up @@ -7,6 +7,7 @@ import { PassThrough, Writable } from 'stream';
import { HttpStatus, RequestMethod } from '../../../common';
import { RouterResponseController } from '../../router/router-response-controller';
import { NoopHttpAdapter } from '../utils/noop-adapter.spec';
import { SseStream } from '../../router/sse-stream';

describe('RouterResponseController', () => {
let adapter: NoopHttpAdapter;
Expand Down Expand Up @@ -374,6 +375,73 @@ data: test
done();
});

describe('when writing data too densely', () => {
const DEFAULT_MAX_LISTENERS = SseStream.defaultMaxListeners;
const MAX_LISTENERS = 1;
const sandbox = sinon.createSandbox();

beforeEach(() => {
// Can't access to the internal sseStream,
// as a workround, set `defaultMaxListeners` of `SseStream` and reset the max listeners of `process`
const PROCESS_MAX_LISTENERS = process.getMaxListeners();
SseStream.defaultMaxListeners = MAX_LISTENERS;
process.setMaxListeners(PROCESS_MAX_LISTENERS);

const sseStream = sinon.createStubInstance(SseStream);
const originalWrite = SseStream.prototype.write;
// Make `.write()` always return false, so as to listen `drain` event
sseStream.write.callsFake(function (...args: any[]) {
originalWrite.apply(this, args);
return false;
});
sandbox.replace(SseStream.prototype, 'write', sseStream.write);
});

afterEach(() => {
sandbox.restore();
SseStream.defaultMaxListeners = DEFAULT_MAX_LISTENERS;
});

it('should not cause memory leak', async () => {
let maxDrainListenersExceededWarning = null;
process.on('warning', (warning: any) => {
if (
warning.name === 'MaxListenersExceededWarning' &&
warning.emitter instanceof SseStream &&
warning.type === 'drain' &&
warning.count === MAX_LISTENERS + 1
) {
maxDrainListenersExceededWarning = warning;
}
});

const result = new Subject();

const response = new Writable();
response._write = () => {};

const request = new Writable();
request._write = () => {};

routerResponseController.sse(
result,
response as unknown as ServerResponse,
request as unknown as IncomingMessage,
);

// Send multiple messages simultaneously
Array.from({ length: MAX_LISTENERS + 1 }).forEach((_, i) =>
result.next(String(i)),
);

await new Promise(resolve => process.nextTick(resolve));

expect(() => {
expect(maxDrainListenersExceededWarning).to.equal(null);
}, 'it will fail as there is an issue here to be addressed').to.throw();
});
});

describe('when there is an error', () => {
it('should close the request', done => {
const result = new Subject();
Expand Down

0 comments on commit af87b2a

Please sign in to comment.