Skip to content

Commit

Permalink
use syncCommPort to exchange data too
Browse files Browse the repository at this point in the history
  • Loading branch information
targos committed Jan 8, 2023
1 parent d3af446 commit 3f10022
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 86 deletions.
62 changes: 10 additions & 52 deletions lib/internal/modules/esm/hooks.js
Expand Up @@ -10,8 +10,6 @@ const {
SafeSet,
StringPrototypeSlice,
StringPrototypeToUpperCase,
TypedArrayPrototypeFill,
TypedArrayPrototypeSet,
globalThis,
} = primordials;

Expand All @@ -24,14 +22,11 @@ const {
ERR_INVALID_RETURN_VALUE,
} = require('internal/errors').codes;
const { URL } = require('internal/url');
const { receiveMessageOnPort } = require('worker_threads');
const {
isAnyArrayBuffer,
isArrayBufferView,
} = require('internal/util/types');
const {
deserialize,
serialize,
} = require('v8');
const {
validateObject,
validateString,
Expand Down Expand Up @@ -445,12 +440,6 @@ class Hooks {
ObjectSetPrototypeOf(Hooks.prototype, null);

class HooksProxy {
/**
* The request & response segment of the shared memory. TextEncoder/Decoder (needed to convert
* requests & responses into a format supported by the comms channel) reads and writes with
* Uint8Array.
*/
#data;
/**
* The lock/unlock segment of the shared memory. Atomics require this to be a Int32Array. This
* segment is used to tell the main to sleep when the worker is processing.
Expand All @@ -473,9 +462,7 @@ class HooksProxy {
const { InternalWorker } = require('internal/worker');
const { MessageChannel } = require('internal/worker/io');

const data = new SharedArrayBuffer(204_800);
const lock = new SharedArrayBuffer(4);
this.#data = new Uint8Array(data);
this.#lock = new Int32Array(lock);
const syncCommChannel = new MessageChannel();
this.#syncCommPort = syncCommChannel.port1;
Expand All @@ -486,7 +473,6 @@ class HooksProxy {
stdout: false,
trackUnmanagedFds: false,
workerData: {
data,
lock,
syncCommPort: syncCommChannel.port2,
},
Expand All @@ -505,46 +491,18 @@ class HooksProxy {
this.#isReady = true;
}

TypedArrayPrototypeFill(this.#data, undefined); // Erase handled request/response data
const request = serialize({ method, args });
TypedArrayPrototypeSet(this.#data, request);

// Signal the worker that there is work to do.
this.#syncCommPort.postMessage(true);

const chunks = [];
let done = false;
try {
while (done === false) {
this.#awaitResponse();
const chunkContainer = deserialize(this.#data);
// `deserialize` might return an error, such as a `RangeError`, rather than throw it
if (chunkContainer instanceof Error) { throw chunkContainer; }
({ done } = chunkContainer);
const { value } = chunkContainer;
if (!value) { throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); }
ArrayPrototypePush(chunks, value);
}
} catch (exception) {
if (this.#data[0] === 0) { // Response should not be empty
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
} else {
throw exception;
}
}
if (chunks.length === 0) { // Response should not be empty
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
}
const reassembledChunks = Buffer.concat(chunks);
const response = deserialize(reassembledChunks);
if (response instanceof Error) { throw response; }
return response;
}
// Pass work to the worker.
this.#syncCommPort.postMessage({ method, args });

#awaitResponse() {
Atomics.store(this.#lock, 0, 0); // Send request to worker
Atomics.notify(this.#lock, 0); // Notify worker of new request
Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds

const response = receiveMessageOnPort(this.#syncCommPort).message;
if (response instanceof Error) {
throw response;
} else {
return response;
}
}
}
ObjectSetPrototypeOf(HooksProxy.prototype, null);
Expand Down
44 changes: 10 additions & 34 deletions lib/internal/modules/esm/worker.js
Expand Up @@ -4,9 +4,6 @@ const {
Int32Array,
ReflectApply,
SafeWeakMap,
TypedArrayPrototypeFill,
TypedArrayPrototypeSet,
Uint8Array,
globalThis: {
Atomics,
},
Expand All @@ -22,7 +19,6 @@ const { workerData } = require('worker_threads');
// lock = 0 -> main sleeps
// lock = 1 -> worker sleeps
const lock = new Int32Array(workerData.lock); // Required by Atomics
const data = new Uint8Array(workerData.data); // For v8.deserialize/serialize
const { syncCommPort } = workerData; // To receive work signals.
const CHUNK_THRESHOLD = data.length - 26; // `{ done: true/false, value: … }` adds 26 bytes

Expand All @@ -49,16 +45,11 @@ function releaseLock() {

syncCommPort.on('message', handleSyncMessage);

// ! Put as little above this line as possible
releaseLock(); // Send 'ready' signal to main
// ! Put as little above this line as possible.
releaseLock(); // Send 'ready' signal to main.

const { deserialize, serialize } = require('v8');

async function handleSyncMessage() {
const { method, args } = deserialize(data);
TypedArrayPrototypeFill(data, undefined);

// Each potential exception needs to be caught individually so that the correct error is sent to the main thread
async function handleSyncMessage({ method, args }) {
// Each potential exception needs to be caught individually so that the correct error is sent to the main thread.
let response;
if (initializationError) {
response = initializationError;
Expand All @@ -69,33 +60,18 @@ function releaseLock() {
throw new ERR_INVALID_RETURN_VALUE('object', method, response)
}
} catch (exception) {
TypedArrayPrototypeSet(data, serialize({
__proto__: null,
done: i === chunkCount - 1,
value: serialize(exception),
}));
releaseLock();
response = exception
}
}

// Send the method response (or exception) to the main thread.
try {
const serializedResponseValue = serialize(response);
const chunkCount = Math.ceil(serializedResponseValue.byteLength / CHUNK_THRESHOLD);
for (let i = 0; i < chunkCount; i++) {
const chunk = {
__proto__: null,
done: i === chunkCount - 1,
value: serializedResponseValue.slice(i, i + CHUNK_THRESHOLD),
};
const serializedChunk = serialize(chunk);
// Send the method response (or exception) to the main thread
TypedArrayPrototypeSet(data, serializedChunk);
releaseLock();
}
syncCommPort.postMessage(response);
} catch (exception) {
TypedArrayPrototypeSet(data, serialize(exception));
releaseLock();
// Or send the exception thrown when trying to send the response.
syncCommPort.postMessage(exception);
}
releaseLock();
}
})().catch((err) => {
// The triggerUncaughtException call below does not terminate the process or surface errors to the user;
Expand Down

0 comments on commit 3f10022

Please sign in to comment.