From 3f1002226906b47ba22f32da8f624235cab4d3ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C3=ABl=20Zasso?= Date: Sun, 8 Jan 2023 15:08:41 +0000 Subject: [PATCH] use syncCommPort to exchange data too --- lib/internal/modules/esm/hooks.js | 62 +++++------------------------- lib/internal/modules/esm/worker.js | 44 +++++---------------- 2 files changed, 20 insertions(+), 86 deletions(-) diff --git a/lib/internal/modules/esm/hooks.js b/lib/internal/modules/esm/hooks.js index d5d8b3683402be..cd1895912d24d1 100644 --- a/lib/internal/modules/esm/hooks.js +++ b/lib/internal/modules/esm/hooks.js @@ -10,8 +10,6 @@ const { SafeSet, StringPrototypeSlice, StringPrototypeToUpperCase, - TypedArrayPrototypeFill, - TypedArrayPrototypeSet, globalThis, } = primordials; @@ -24,14 +22,11 @@ const { ERR_INVALID_RETURN_VALUE, } = require('internal/errors').codes; const { URL } = require('internal/url'); +const { receiveMessageOnPort } = require('worker_threads'); const { isAnyArrayBuffer, isArrayBufferView, } = require('internal/util/types'); -const { - deserialize, - serialize, -} = require('v8'); const { validateObject, validateString, @@ -445,12 +440,6 @@ class Hooks { ObjectSetPrototypeOf(Hooks.prototype, null); class HooksProxy { - /** - * The request & response segment of the shared memory. TextEncoder/Decoder (needed to convert - * requests & responses into a format supported by the comms channel) reads and writes with - * Uint8Array. - */ - #data; /** * The lock/unlock segment of the shared memory. Atomics require this to be a Int32Array. This * segment is used to tell the main to sleep when the worker is processing. @@ -473,9 +462,7 @@ class HooksProxy { const { InternalWorker } = require('internal/worker'); const { MessageChannel } = require('internal/worker/io'); - const data = new SharedArrayBuffer(204_800); const lock = new SharedArrayBuffer(4); - this.#data = new Uint8Array(data); this.#lock = new Int32Array(lock); const syncCommChannel = new MessageChannel(); this.#syncCommPort = syncCommChannel.port1; @@ -486,7 +473,6 @@ class HooksProxy { stdout: false, trackUnmanagedFds: false, workerData: { - data, lock, syncCommPort: syncCommChannel.port2, }, @@ -505,46 +491,18 @@ class HooksProxy { this.#isReady = true; } - TypedArrayPrototypeFill(this.#data, undefined); // Erase handled request/response data - const request = serialize({ method, args }); - TypedArrayPrototypeSet(this.#data, request); - - // Signal the worker that there is work to do. - this.#syncCommPort.postMessage(true); - - const chunks = []; - let done = false; - try { - while (done === false) { - this.#awaitResponse(); - const chunkContainer = deserialize(this.#data); - // `deserialize` might return an error, such as a `RangeError`, rather than throw it - if (chunkContainer instanceof Error) { throw chunkContainer; } - ({ done } = chunkContainer); - const { value } = chunkContainer; - if (!value) { throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); } - ArrayPrototypePush(chunks, value); - } - } catch (exception) { - if (this.#data[0] === 0) { // Response should not be empty - throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); - } else { - throw exception; - } - } - if (chunks.length === 0) { // Response should not be empty - throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); - } - const reassembledChunks = Buffer.concat(chunks); - const response = deserialize(reassembledChunks); - if (response instanceof Error) { throw response; } - return response; - } + // Pass work to the worker. + this.#syncCommPort.postMessage({ method, args }); - #awaitResponse() { Atomics.store(this.#lock, 0, 0); // Send request to worker - Atomics.notify(this.#lock, 0); // Notify worker of new request Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds + + const response = receiveMessageOnPort(this.#syncCommPort).message; + if (response instanceof Error) { + throw response; + } else { + return response; + } } } ObjectSetPrototypeOf(HooksProxy.prototype, null); diff --git a/lib/internal/modules/esm/worker.js b/lib/internal/modules/esm/worker.js index c4eebc1ec173aa..63128407f3c012 100644 --- a/lib/internal/modules/esm/worker.js +++ b/lib/internal/modules/esm/worker.js @@ -4,9 +4,6 @@ const { Int32Array, ReflectApply, SafeWeakMap, - TypedArrayPrototypeFill, - TypedArrayPrototypeSet, - Uint8Array, globalThis: { Atomics, }, @@ -22,7 +19,6 @@ const { workerData } = require('worker_threads'); // lock = 0 -> main sleeps // lock = 1 -> worker sleeps const lock = new Int32Array(workerData.lock); // Required by Atomics -const data = new Uint8Array(workerData.data); // For v8.deserialize/serialize const { syncCommPort } = workerData; // To receive work signals. const CHUNK_THRESHOLD = data.length - 26; // `{ done: true/false, value: … }` adds 26 bytes @@ -49,16 +45,11 @@ function releaseLock() { syncCommPort.on('message', handleSyncMessage); - // ! Put as little above this line as possible - releaseLock(); // Send 'ready' signal to main + // ! Put as little above this line as possible. + releaseLock(); // Send 'ready' signal to main. - const { deserialize, serialize } = require('v8'); - - async function handleSyncMessage() { - const { method, args } = deserialize(data); - TypedArrayPrototypeFill(data, undefined); - - // Each potential exception needs to be caught individually so that the correct error is sent to the main thread + async function handleSyncMessage({ method, args }) { + // Each potential exception needs to be caught individually so that the correct error is sent to the main thread. let response; if (initializationError) { response = initializationError; @@ -69,33 +60,18 @@ function releaseLock() { throw new ERR_INVALID_RETURN_VALUE('object', method, response) } } catch (exception) { - TypedArrayPrototypeSet(data, serialize({ - __proto__: null, - done: i === chunkCount - 1, - value: serialize(exception), - })); - releaseLock(); + response = exception } } + // Send the method response (or exception) to the main thread. try { - const serializedResponseValue = serialize(response); - const chunkCount = Math.ceil(serializedResponseValue.byteLength / CHUNK_THRESHOLD); - for (let i = 0; i < chunkCount; i++) { - const chunk = { - __proto__: null, - done: i === chunkCount - 1, - value: serializedResponseValue.slice(i, i + CHUNK_THRESHOLD), - }; - const serializedChunk = serialize(chunk); - // Send the method response (or exception) to the main thread - TypedArrayPrototypeSet(data, serializedChunk); - releaseLock(); - } + syncCommPort.postMessage(response); } catch (exception) { - TypedArrayPrototypeSet(data, serialize(exception)); - releaseLock(); + // Or send the exception thrown when trying to send the response. + syncCommPort.postMessage(exception); } + releaseLock(); } })().catch((err) => { // The triggerUncaughtException call below does not terminate the process or surface errors to the user;