From 0f8b5e95e23db159e5ed2cd376e34e3f737f57f1 Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Thu, 11 Apr 2024 10:34:58 +0200 Subject: [PATCH] [fix] Emit at most one event per event loop iteration Fixes #2216 --- lib/receiver.js | 42 +++--------------------------------------- test/receiver.test.js | 28 ++++++++++++++++++---------- test/websocket.test.js | 6 ++---- 3 files changed, 23 insertions(+), 53 deletions(-) diff --git a/lib/receiver.js b/lib/receiver.js index 9e87d811f..3c6cf1f44 100644 --- a/lib/receiver.js +++ b/lib/receiver.js @@ -13,13 +13,6 @@ const { concat, toArrayBuffer, unmask } = require('./buffer-util'); const { isValidStatusCode, isValidUTF8 } = require('./validation'); const FastBuffer = Buffer[Symbol.species]; -const promise = Promise.resolve(); - -// -// `queueMicrotask()` is not available in Node.js < 11. -// -const queueTask = - typeof queueMicrotask === 'function' ? queueMicrotask : queueMicrotaskShim; const GET_INFO = 0; const GET_PAYLOAD_LENGTH_16 = 1; @@ -577,7 +570,7 @@ class Receiver extends Writable { this._state = GET_INFO; } else { this._state = DEFER_EVENT; - queueTask(() => { + setImmediate(() => { this.emit('message', data, true); this._state = GET_INFO; this.startLoop(cb); @@ -604,7 +597,7 @@ class Receiver extends Writable { this._state = GET_INFO; } else { this._state = DEFER_EVENT; - queueTask(() => { + setImmediate(() => { this.emit('message', buf, false); this._state = GET_INFO; this.startLoop(cb); @@ -675,7 +668,7 @@ class Receiver extends Writable { this._state = GET_INFO; } else { this._state = DEFER_EVENT; - queueTask(() => { + setImmediate(() => { this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data); this._state = GET_INFO; this.startLoop(cb); @@ -711,32 +704,3 @@ class Receiver extends Writable { } module.exports = Receiver; - -/** - * A shim for `queueMicrotask()`. - * - * @param {Function} cb Callback - */ -function queueMicrotaskShim(cb) { - promise.then(cb).catch(throwErrorNextTick); -} - -/** - * Throws an error. - * - * @param {Error} err The error to throw - * @private - */ -function throwError(err) { - throw err; -} - -/** - * Throws an error in the next tick. - * - * @param {Error} err The error to throw - * @private - */ -function throwErrorNextTick(err) { - process.nextTick(throwError, err); -} diff --git a/test/receiver.test.js b/test/receiver.test.js index a88f29b9a..f3a0fa645 100644 --- a/test/receiver.test.js +++ b/test/receiver.test.js @@ -1085,17 +1085,21 @@ describe('Receiver', () => { receiver.write(Buffer.from([0x88, 0x03, 0x03, 0xe8, 0xf8])); }); - it('emits at most one event per microtask', (done) => { + it('emits at most one event per event loop iteration', (done) => { const actual = []; const expected = [ '1', - 'microtask 1', + '- 1', + '-- 1', '2', - 'microtask 2', + '- 2', + '-- 2', '3', - 'microtask 3', + '- 3', + '-- 3', '4', - 'microtask 4' + '- 4', + '-- 4' ]; function listener(data) { @@ -1104,12 +1108,16 @@ describe('Receiver', () => { // `queueMicrotask()` is not available in Node.js < 11. Promise.resolve().then(() => { - actual.push(`microtask ${message}`); + actual.push(`- ${message}`); - if (actual.length === 8) { - assert.deepStrictEqual(actual, expected); - done(); - } + Promise.resolve().then(() => { + actual.push(`-- ${message}`); + + if (actual.length === 12) { + assert.deepStrictEqual(actual, expected); + done(); + } + }); }); } diff --git a/test/websocket.test.js b/test/websocket.test.js index e1b3bd239..d27457d6c 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -4205,8 +4205,7 @@ describe('WebSocket', () => { if (messages.push(message.toString()) > 1) return; - // `queueMicrotask()` is not available in Node.js < 11. - Promise.resolve().then(() => { + setImmediate(() => { process.nextTick(() => { assert.strictEqual(ws._receiver._state, 5); ws.close(1000); @@ -4456,8 +4455,7 @@ describe('WebSocket', () => { if (messages.push(message.toString()) > 1) return; - // `queueMicrotask()` is not available in Node.js < 11. - Promise.resolve().then(() => { + setImmediate(() => { process.nextTick(() => { assert.strictEqual(ws._receiver._state, 5); ws.terminate();