diff --git a/lib/internal/modules/esm/hooks.js b/lib/internal/modules/esm/hooks.js index c94912afa2edd9..8003d4fdffc2be 100644 --- a/lib/internal/modules/esm/hooks.js +++ b/lib/internal/modules/esm/hooks.js @@ -10,8 +10,6 @@ const { SafeSet, StringPrototypeSlice, StringPrototypeToUpperCase, - TypedArrayPrototypeSet, - TypedArrayPrototypeSlice, globalThis, } = primordials; @@ -24,14 +22,11 @@ const { ERR_INVALID_RETURN_VALUE, } = require('internal/errors').codes; const { URL } = require('internal/url'); +const { receiveMessageOnPort } = require('worker_threads'); const { isAnyArrayBuffer, isArrayBufferView, } = require('internal/util/types'); -const { - deserialize, - serialize, -} = require('v8'); const { validateObject, validateString, @@ -457,16 +452,6 @@ class HooksProxy { */ #syncCommPort; - #done; - #chunkLength; - - /** - * The request & response segment of the shared memory. TextEncoder/Decoder (needed to convert - * requests & responses into a format supported by the comms channel) reads and writes with - * Uint8Array. - */ - #data; - #isReady = false; /** @@ -478,13 +463,7 @@ class HooksProxy { const { MessageChannel } = require('internal/worker/io'); const lock = new SharedArrayBuffer(4); // Signal to tell the other thread to sleep or wake - const done = new SharedArrayBuffer(1); // For chunking, to know whether the last chunk has been sent - const chunkLength = new SharedArrayBuffer(32); // For chunking, to know the length of the current chunk - const data = new SharedArrayBuffer(2048); // The data for the request and response this.#lock = new Int32Array(lock); - this.#done = new Uint8Array(done); - this.#chunkLength = new Uint8Array(chunkLength); - this.#data = new Uint8Array(data); const syncCommChannel = new MessageChannel(); this.#syncCommPort = syncCommChannel.port1; @@ -495,9 +474,6 @@ class HooksProxy { trackUnmanagedFds: false, workerData: { lock, - done, - chunkLength, - data, syncCommPort: syncCommChannel.port2, }, transferList: [syncCommChannel.port2], @@ -515,50 +491,22 @@ class HooksProxy { this.#isReady = true; } - const request = serialize({ method, args }); - TypedArrayPrototypeSet(this.#data, request); - TypedArrayPrototypeSet(this.#chunkLength, serialize(request.byteLength)); - TypedArrayPrototypeSet(this.#done, [1]); - - const chunks = []; - let done = false; - while (done === false) { - this.#awaitResponse(); + // Pass work to the worker. + this.#syncCommPort.postMessage({ method, args }); - try { - var chunkLength = deserialize(this.#chunkLength); - } catch (err) { - throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); - } - if (!chunkLength) { throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); } - - const chunk = TypedArrayPrototypeSlice(this.#data, 0, chunkLength); - if (!chunk) { throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); } - - ArrayPrototypePush(chunks, chunk); - if (this.#done[0] === 1) { done = true; } - } - if (chunks.length === 0) { // Response should not be empty - throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); - } - const reassembledChunks = Buffer.concat(chunks); - const response = deserialize(reassembledChunks); + Atomics.store(this.#lock, 0, 0); // Reset lock. + Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds. + const response = receiveMessageOnPort(this.#syncCommPort).message; if (response instanceof Error) { // An exception was thrown in the worker thread; re-throw to crash the process const { triggerUncaughtException } = internalBinding('errors'); triggerUncaughtException(response); + } else if (!response) { + throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); + } else { + return response; } - - return response; - } - - #awaitResponse() { - // Signal the worker that there is work to do. - this.#syncCommPort.postMessage(true); - - Atomics.store(this.#lock, 0, 0); // Reset lock. - Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds. } } ObjectSetPrototypeOf(HooksProxy.prototype, null); diff --git a/lib/internal/modules/esm/worker.js b/lib/internal/modules/esm/worker.js index 352107317e0301..7290e0ffb71c12 100644 --- a/lib/internal/modules/esm/worker.js +++ b/lib/internal/modules/esm/worker.js @@ -4,9 +4,6 @@ const { Int32Array, ReflectApply, SafeWeakMap, - TypedArrayPrototypeSet, - TypedArrayPrototypeSlice, - Uint8Array, globalThis: { Atomics, }, @@ -24,9 +21,6 @@ const { workerData } = require('worker_threads'); // lock = 0 -> main sleeps // lock = 1 -> worker sleeps const lock = new Int32Array(workerData.lock); // Required by Atomics -const done = new Uint8Array(workerData.done); // Coordinate chunks between main and worker -const chunkLength = new Uint8Array(workerData.chunkLength); // Coordinate chunks between main and worker -const data = new Uint8Array(workerData.data); // Chunks content const { syncCommPort } = workerData; // To receive work signals. function releaseLock() { @@ -52,54 +46,48 @@ function releaseLock() { syncCommPort.on('message', handleSyncMessage); - // ! Put as little above this line as possible - releaseLock(); // Send 'ready' signal to main + // ! Put as little above this line as possible. + releaseLock(); // Send 'ready' signal to main. - // Preserve state across iterations of the loop so that we can return responses in chunks - let serializedResponse, chunksCount, chunksSent = 0; - async function handleSyncMessage() { + async function handleSyncMessage({ method, args }) { + // Each potential exception needs to be caught individually so that the correct error is sent to the main thread. + let response; if (initializationError) { - serializedResponse = serialize(initializationError); - chunksCount = 1; - } else if (done[0] !== 0) { // Not currently sending chunks to main thread; process new request - const requestLength = deserialize(chunkLength); - const { method, args } = deserialize(data.slice(0, requestLength)); + response = initializationError; + } else { if (!hooks[method]) { throw new ERR_INVALID_ARG_VALUE('method', method); } - const response = await ReflectApply(hooks[method], hooks, args); - if (!response) { - throw new ERR_INVALID_RETURN_VALUE('object', method, response) + try { + response = await ReflectApply(hooks[method], hooks, args); + if (!response) { + throw new ERR_INVALID_RETURN_VALUE('object', method, response); + } + } catch (exception) { + response = exception; } - - serializedResponse = serialize(response); - chunksCount = Math.ceil(serializedResponse.byteLength / data.length); - chunksSent = 0; } - const startIndex = chunksSent * data.length; - const endIndex = startIndex + data.length; - const chunk = TypedArrayPrototypeSlice(serializedResponse, startIndex, endIndex); - const isLastChunk = chunksSent === chunksCount - 1; - TypedArrayPrototypeSet(data, chunk); - TypedArrayPrototypeSet(chunkLength, serialize(chunk.byteLength)); - TypedArrayPrototypeSet(done, isLastChunk ? [1] : [0]); - if (isLastChunk) { - serializedResponse = undefined; - chunksCount = undefined; - chunksSent = 0; - } else { - chunksSent++; + // Send the method response (or exception) to the main thread. + try { + syncCommPort.postMessage(response); + } catch (exception) { + // Or send the exception thrown when trying to send the response. + syncCommPort.postMessage(exception); } releaseLock(); } -})().catch((exception) => { +})().catch(exception => { // Send the exception up to the main thread so it can throw it and crash the process - process._rawDebug('exception in worker:', exception) - const chunk = serialize(exception); - TypedArrayPrototypeSet(data, chunk); - TypedArrayPrototypeSet(chunkLength, serialize(chunk.byteLength)); - TypedArrayPrototypeSet(done, [1]); + process._rawDebug('exception in worker:', exception) // TODO: Remove this once exception handling is reliable + syncCommPort.postMessage(exception); + releaseLock(); +}); + +process.on('uncaughtException', (err) => { + process._rawDebug('process uncaughtException:', err); + const { triggerUncaughtException } = internalBinding('errors'); releaseLock(); + triggerUncaughtException(err); });