Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit at most one event per event loop iteration #2218

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 4 additions & 45 deletions lib/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ const { concat, toArrayBuffer, unmask } = require('./buffer-util');
const { isValidStatusCode, isValidUTF8 } = require('./validation');

const FastBuffer = Buffer[Symbol.species];
const promise = Promise.resolve();

//
// `queueMicrotask()` is not available in Node.js < 11.
//
const queueTask =
typeof queueMicrotask === 'function' ? queueMicrotask : queueMicrotaskShim;

const GET_INFO = 0;
const GET_PAYLOAD_LENGTH_16 = 1;
Expand Down Expand Up @@ -567,17 +560,12 @@ class Receiver extends Writable {
data = fragments;
}

//
// If the state is `INFLATING`, it means that the frame data was
// decompressed asynchronously, so there is no need to defer the event
// as it will be emitted asynchronously anyway.
//
if (this._state === INFLATING || this._allowSynchronousEvents) {
if (this._allowSynchronousEvents) {
this.emit('message', data, true);
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit('message', data, true);
this._state = GET_INFO;
this.startLoop(cb);
Expand All @@ -604,7 +592,7 @@ class Receiver extends Writable {
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit('message', buf, false);
this._state = GET_INFO;
this.startLoop(cb);
Expand Down Expand Up @@ -675,7 +663,7 @@ class Receiver extends Writable {
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
this._state = GET_INFO;
this.startLoop(cb);
Expand Down Expand Up @@ -711,32 +699,3 @@ class Receiver extends Writable {
}

module.exports = Receiver;

/**
* A shim for `queueMicrotask()`.
*
* @param {Function} cb Callback
*/
function queueMicrotaskShim(cb) {
promise.then(cb).catch(throwErrorNextTick);
}

/**
* Throws an error.
*
* @param {Error} err The error to throw
* @private
*/
function throwError(err) {
throw err;
}

/**
* Throws an error in the next tick.
*
* @param {Error} err The error to throw
* @private
*/
function throwErrorNextTick(err) {
process.nextTick(throwError, err);
}
28 changes: 18 additions & 10 deletions test/receiver.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1085,17 +1085,21 @@ describe('Receiver', () => {
receiver.write(Buffer.from([0x88, 0x03, 0x03, 0xe8, 0xf8]));
});

it('emits at most one event per microtask', (done) => {
it('emits at most one event per event loop iteration', (done) => {
const actual = [];
const expected = [
'1',
'microtask 1',
'- 1',
'-- 1',
'2',
'microtask 2',
'- 2',
'-- 2',
'3',
'microtask 3',
'- 3',
'-- 3',
'4',
'microtask 4'
'- 4',
'-- 4'
];

function listener(data) {
Expand All @@ -1104,12 +1108,16 @@ describe('Receiver', () => {

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
actual.push(`microtask ${message}`);
actual.push(`- ${message}`);

if (actual.length === 8) {
assert.deepStrictEqual(actual, expected);
done();
}
Promise.resolve().then(() => {
actual.push(`-- ${message}`);

if (actual.length === 12) {
assert.deepStrictEqual(actual, expected);
done();
}
});
});
}

Expand Down
6 changes: 2 additions & 4 deletions test/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4205,8 +4205,7 @@ describe('WebSocket', () => {

if (messages.push(message.toString()) > 1) return;

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
setImmediate(() => {
process.nextTick(() => {
assert.strictEqual(ws._receiver._state, 5);
ws.close(1000);
Expand Down Expand Up @@ -4456,8 +4455,7 @@ describe('WebSocket', () => {

if (messages.push(message.toString()) > 1) return;

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
setImmediate(() => {
process.nextTick(() => {
assert.strictEqual(ws._receiver._state, 5);
ws.terminate();
Expand Down