From 18c8684faad0533b4af5e0e1f330f8959f3c3568 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ari=20Perkki=C3=B6?= Date: Mon, 25 Mar 2024 18:47:59 +0200 Subject: [PATCH] fix: prevent hang when `process` is overwritten (#83) --- src/entry/process.ts | 11 +++++++---- src/entry/worker.ts | 5 +++-- test/globals.test.ts | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 6 deletions(-) create mode 100644 test/globals.test.ts diff --git a/src/entry/process.ts b/src/entry/process.ts index 39f0295..f801213 100644 --- a/src/entry/process.ts +++ b/src/entry/process.ts @@ -23,6 +23,9 @@ process.__tinypool_state__ = { workerId: Number(process.env.TINYPOOL_WORKER_ID), } +const memoryUsage = process.memoryUsage.bind(process) +const send = process.send!.bind(process) + process.on('message', (message: IncomingMessage) => { // Message was not for port or pool // It's likely end-users own communication between main and worker @@ -36,7 +39,7 @@ process.on('message', (message: IncomingMessage) => { await getHandler(filename, name) } - process.send!({ + send!({ ready: true, source: 'pool', __tinypool_worker_message__: true, @@ -69,7 +72,7 @@ async function onMessage(message: IncomingMessage & { source: 'port' }) { taskId, result, error: null, - usedMemory: process.memoryUsage().heapUsed, + usedMemory: memoryUsage().heapUsed, } // If the task used e.g. console.log(), wait for the stream to drain @@ -89,11 +92,11 @@ async function onMessage(message: IncomingMessage & { source: 'port' }) { taskId, result: null, error: serializeError(error), - usedMemory: process.memoryUsage().heapUsed, + usedMemory: memoryUsage().heapUsed, } } - process.send!(response) + send!(response) } function serializeError(error: unknown) { diff --git a/src/entry/worker.ts b/src/entry/worker.ts index 79bdd99..ef11660 100644 --- a/src/entry/worker.ts +++ b/src/entry/worker.ts @@ -28,6 +28,7 @@ process.__tinypool_state__ = { workerId: tinypoolPrivateData.workerId, } +const memoryUsage = process.memoryUsage.bind(process) let useAtomics: boolean = process.env.PISCINA_DISABLE_ATOMICS !== '1' // We should only receive this message once, when the Worker starts. It gives @@ -110,7 +111,7 @@ function onMessage( taskId, result: result, error: null, - usedMemory: process.memoryUsage().heapUsed, + usedMemory: memoryUsage().heapUsed, } // If the task used e.g. console.log(), wait for the stream to drain @@ -130,7 +131,7 @@ function onMessage( // It may be worth taking a look at the error cloning algorithm we // use in Node.js core here, it's quite a bit more flexible error, - usedMemory: process.memoryUsage().heapUsed, + usedMemory: memoryUsage().heapUsed, } } currentTasks-- diff --git a/test/globals.test.ts b/test/globals.test.ts new file mode 100644 index 0000000..257a128 --- /dev/null +++ b/test/globals.test.ts @@ -0,0 +1,32 @@ +import * as path from 'path' +import { fileURLToPath } from 'url' +import { Tinypool } from 'tinypool' + +const __dirname = path.dirname(fileURLToPath(import.meta.url)) + +describe.each(['worker_threads', 'child_process'] as const)('%s', (runtime) => { + test("doesn't hang when process is overwritten", async () => { + const pool = createPool({ runtime }) + + const result = await pool.run(` + (async () => { + return new Promise(resolve => { + globalThis.process = { exit: resolve }; + process.exit("exit() from overwritten process"); + }); + })(); + `) + expect(result).toBe('exit() from overwritten process') + }) +}) + +function createPool(options: Partial) { + const pool = new Tinypool({ + filename: path.resolve(__dirname, 'fixtures/eval.js'), + minThreads: 1, + maxThreads: 1, + ...options, + }) + + return pool +}