From 415cbe707220a3eca70a6a9fda7eb406af2786e1 Mon Sep 17 00:00:00 2001 From: theanarkh Date: Wed, 11 Jan 2023 00:10:23 +0800 Subject: [PATCH] src,lib: the handle keeps loop alive in cluster rr mode --- lib/internal/cluster/child.js | 33 +++++++++++++++---- test/parallel/test-cluster-rr-handle-close.js | 18 ++++++++++ .../test-cluster-rr-handle-keep-loop-alive.js | 23 +++++++++++++ .../test-cluster-rr-handle-ref-unref.js | 20 +++++++++++ 4 files changed, 87 insertions(+), 7 deletions(-) create mode 100644 test/parallel/test-cluster-rr-handle-close.js create mode 100644 test/parallel/test-cluster-rr-handle-keep-loop-alive.js create mode 100644 test/parallel/test-cluster-rr-handle-ref-unref.js diff --git a/lib/internal/cluster/child.js b/lib/internal/cluster/child.js index 48e81d8744310b..2837c15fb8a479 100644 --- a/lib/internal/cluster/child.js +++ b/lib/internal/cluster/child.js @@ -16,6 +16,8 @@ const { owner_symbol } = require('internal/async_hooks').symbols; const Worker = require('internal/cluster/worker'); const { internal, sendHelper } = require('internal/cluster/utils'); const { exitCodes: { kNoFailure } } = internalBinding('errors'); +const { TIMEOUT_MAX } = require('internal/timers'); +const { setInterval, clearInterval } = require('timers'); const cluster = new EventEmitter(); const handles = new SafeMap(); @@ -162,6 +164,21 @@ function rr(message, { indexesKey, index }, cb) { let key = message.key; + let fakeHandle = null; + + function ref() { + if (!fakeHandle) { + fakeHandle = setInterval(noop, TIMEOUT_MAX); + } + } + + function unref() { + if (fakeHandle) { + clearInterval(fakeHandle); + fakeHandle = null; + } + } + function listen(backlog) { // TODO(bnoordhuis) Send a message to the primary that tells it to // update the backlog size. The actual backlog should probably be @@ -177,7 +194,11 @@ function rr(message, { indexesKey, index }, cb) { // the primary. if (key === undefined) return; - + unref(); + // If the handle is the last handle in process, + // the parent process will delete the handle when worker process exits. + // So it is ok if the close message get lost. + // See the comments of https://github.com/nodejs/node/pull/46161 send({ act: 'close', key }); handles.delete(key); removeIndexesKey(indexesKey, index); @@ -191,12 +212,10 @@ function rr(message, { indexesKey, index }, cb) { return 0; } - // Faux handle. Mimics a TCPWrap with just enough fidelity to get away - // with it. Fools net.Server into thinking that it's backed by a real - // handle. Use a noop function for ref() and unref() because the control - // channel is going to keep the worker alive anyway. - const handle = { close, listen, ref: noop, unref: noop }; - + // Faux handle. net.Server is not associated with handle, + // so we control its state(ref or unref) by setInterval. + const handle = { close, listen, ref, unref }; + handle.ref(); if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. } diff --git a/test/parallel/test-cluster-rr-handle-close.js b/test/parallel/test-cluster-rr-handle-close.js new file mode 100644 index 00000000000000..fb8e9740d665b7 --- /dev/null +++ b/test/parallel/test-cluster-rr-handle-close.js @@ -0,0 +1,18 @@ +'use strict'; + +const common = require('../common'); +const cluster = require('cluster'); +const net = require('net'); + +cluster.schedulingPolicy = cluster.SCHED_RR; + +if (cluster.isPrimary) { + const worker = cluster.fork(); + worker.on('exit', common.mustCall()); +} else { + const server = net.createServer(common.mustNotCall()); + server.listen(0, common.mustCall(() => { + process.channel.unref(); + server.close(); + })); +} diff --git a/test/parallel/test-cluster-rr-handle-keep-loop-alive.js b/test/parallel/test-cluster-rr-handle-keep-loop-alive.js new file mode 100644 index 00000000000000..0b18408a192ba1 --- /dev/null +++ b/test/parallel/test-cluster-rr-handle-keep-loop-alive.js @@ -0,0 +1,23 @@ +'use strict'; + +const common = require('../common'); +const cluster = require('cluster'); +const net = require('net'); +const assert = require('assert'); + +cluster.schedulingPolicy = cluster.SCHED_RR; + +if (cluster.isPrimary) { + let exited = false; + const worker = cluster.fork(); + worker.on('exit', () => { + exited = true; + }); + setTimeout(() => { + assert.ok(!exited); + worker.kill(); + }, 3000); +} else { + const server = net.createServer(common.mustNotCall()); + server.listen(0, common.mustCall(() => process.channel.unref())); +} diff --git a/test/parallel/test-cluster-rr-handle-ref-unref.js b/test/parallel/test-cluster-rr-handle-ref-unref.js new file mode 100644 index 00000000000000..403bbefd4dd69b --- /dev/null +++ b/test/parallel/test-cluster-rr-handle-ref-unref.js @@ -0,0 +1,20 @@ +'use strict'; + +const common = require('../common'); +const cluster = require('cluster'); +const net = require('net'); + +cluster.schedulingPolicy = cluster.SCHED_RR; + +if (cluster.isPrimary) { + const worker = cluster.fork(); + worker.on('exit', common.mustCall()); +} else { + const server = net.createServer(common.mustNotCall()); + server.listen(0, common.mustCall(() => { + server.ref(); + server.unref(); + process.channel.unref(); + })); + server.unref(); +}