From 1046464bb11bea421730c1ef7cd6544414925ca6 Mon Sep 17 00:00:00 2001 From: Erick Wendel Date: Sat, 23 Apr 2022 12:39:09 -0300 Subject: [PATCH] child_process: queue pending messages It fixes the problem of the child process not receiving messages. Fixes: https://github.com/nodejs/node/issues/41134 --- lib/internal/child_process.js | 32 ++++++++++++++++++- .../test-esm-child-process-fork-main.mjs | 20 ++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 test/es-module/test-esm-child-process-fork-main.mjs diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index 44d19966768587..510fc8b51b2f1e 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -2,8 +2,10 @@ const { ArrayIsArray, + ArrayPrototypePush, ObjectDefineProperty, ObjectSetPrototypeOf, + ReflectApply, Symbol, Uint8Array, } = primordials; @@ -73,6 +75,7 @@ let HTTPParser; const MAX_HANDLE_RETRANSMISSIONS = 3; const kChannelHandle = Symbol('kChannelHandle'); const kIsUsedAsStdio = Symbol('kIsUsedAsStdio'); +const kPendingMessages = Symbol('kPendingMessages'); // This object contain function to convert TCP objects to native handle objects // and back again. @@ -520,6 +523,7 @@ class Control extends EventEmitter { constructor(channel) { super(); this.#channel = channel; + this[kPendingMessages] = []; } // The methods keeping track of the counter are being used to track the @@ -693,6 +697,24 @@ function setupChannel(target, channel, serializationMode) { }); }); + target.on('newListener', function() { + + process.nextTick(() => { + if (!target.channel || !target.listenerCount('message')) + return; + + const messages = target.channel[kPendingMessages]; + const { length } = messages; + if (!length) return; + + for (let i = 0; i < length; i++) { + ReflectApply(target.emit, target, messages[i]); + } + + target.channel[kPendingMessages] = []; + }); + }); + target.send = function(message, handle, options, callback) { if (typeof handle === 'function') { callback = handle; @@ -909,7 +931,15 @@ function setupChannel(target, channel, serializationMode) { }; function emit(event, message, handle) { - target.emit(event, message, handle); + if ('internalMessage' === event || target.listenerCount('message')) { + target.emit(event, message, handle); + return; + } + + ArrayPrototypePush( + target.channel[kPendingMessages], + [event, message, handle] + ); } function handleMessage(message, handle, internal) { diff --git a/test/es-module/test-esm-child-process-fork-main.mjs b/test/es-module/test-esm-child-process-fork-main.mjs new file mode 100644 index 00000000000000..4dd47a2da97b09 --- /dev/null +++ b/test/es-module/test-esm-child-process-fork-main.mjs @@ -0,0 +1,20 @@ +import '../common/index.mjs'; +import assert from 'assert'; +import { fork } from 'child_process'; +import { once } from 'events'; +import { fileURLToPath } from 'url'; + +if (process.argv[2] !== 'child') { + const filename = fileURLToPath(import.meta.url); + const cp = fork(filename, ['child']); + const message = 'Hello World'; + cp.send(message); + + const [received] = await once(cp, 'message'); + assert.deepStrictEqual(received, message); + + cp.disconnect(); + await once(cp, 'exit'); +} else { + process.on('message', (msg) => process.send(msg)); +}