Skip to content

Commit

Permalink
use syncCommPort to exchange data too
Browse files Browse the repository at this point in the history
  • Loading branch information
targos committed Jan 2, 2023
1 parent e46ff62 commit b97e08e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 49 deletions.
36 changes: 4 additions & 32 deletions lib/internal/modules/esm/hooks.js
Expand Up @@ -10,8 +10,6 @@ const {
SafeSet,
StringPrototypeSlice,
StringPrototypeToUpperCase,
TypedArrayPrototypeFill,
TypedArrayPrototypeSet,
globalThis,
} = primordials;

Expand All @@ -24,14 +22,11 @@ const {
ERR_INVALID_RETURN_VALUE,
} = require('internal/errors').codes;
const { URL } = require('internal/url');
const { receiveMessageOnPort } = require('worker_threads');
const {
isAnyArrayBuffer,
isArrayBufferView,
} = require('internal/util/types');
const {
deserialize,
serialize,
} = require('v8');
const {
validateObject,
validateString,
Expand Down Expand Up @@ -445,12 +440,6 @@ class Hooks {
ObjectSetPrototypeOf(Hooks.prototype, null);

class HooksProxy {
/**
* The request & response segment of the shared memory. TextEncoder/Decoder (needed to convert
* requests & responses into a format supported by the comms channel) reads and writes with
* Uint8Array.
*/
#data;
/**
* The lock/unlock segment of the shared memory. Atomics require this to be a Int32Array. This
* segment is used to tell the main to sleep when the worker is processing.
Expand All @@ -473,9 +462,7 @@ class HooksProxy {
const { InternalWorker } = require('internal/worker');
const { MessageChannel } = require('internal/worker/io');

const data = new SharedArrayBuffer(2048);
const lock = new SharedArrayBuffer(4);
this.#data = new Uint8Array(data);
this.#lock = new Int32Array(lock);
const syncCommChannel = new MessageChannel();
this.#syncCommPort = syncCommChannel.port1;
Expand All @@ -486,7 +473,6 @@ class HooksProxy {
stdout: false,
trackUnmanagedFds: false,
workerData: {
data,
lock,
syncCommPort: syncCommChannel.port2,
},
Expand All @@ -505,27 +491,13 @@ class HooksProxy {
this.#isReady = true;
}

TypedArrayPrototypeFill(this.#data, 0); // Erase handled request/response data

const request = serialize({ method, args });
TypedArrayPrototypeSet(this.#data, request);

// Signal the worker that there is work to do.
this.#syncCommPort.postMessage(true);
// Pass work to the worker.
this.#syncCommPort.postMessage({ method, args });

Atomics.store(this.#lock, 0, 0); // Reset lock.
Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds.

let response;
try {
response = deserialize(this.#data);
} catch (exception) {
if (this.#data.every((byte) => byte === 0)) {
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
} else {
throw exception;
}
}
const response = receiveMessageOnPort(this.#syncCommPort).message;
if (response instanceof Error) {
throw response;
} else {
Expand Down
20 changes: 3 additions & 17 deletions lib/internal/modules/esm/worker.js
Expand Up @@ -4,16 +4,12 @@ const {
Int32Array,
ReflectApply,
SafeWeakMap,
TypedArrayPrototypeFill,
TypedArrayPrototypeSet,
Uint8Array,
globalThis: {
Atomics,
},
} = primordials;
const {
ERR_INVALID_RETURN_VALUE,
ERR_OUT_OF_RANGE,
} = require('internal/errors').codes;

// Create this WeakMap in js-land because V8 has no C++ API for WeakMap.
Expand All @@ -25,7 +21,6 @@ if (isMainThread) { return; } // Needed to pass some tests that happen to load t
// lock = 0 -> main sleeps
// lock = 1 -> worker sleeps
const lock = new Int32Array(workerData.lock); // Required by Atomics
const data = new Uint8Array(workerData.data); // For v8.deserialize/serialize
const { syncCommPort } = workerData; // To receive work signals.

function releaseLock() {
Expand Down Expand Up @@ -61,7 +56,7 @@ function releaseLock() {
TypedArrayPrototypeFill(data, 0);

// Each potential exception needs to be caught individually so that the correct error is sent to the main thread
let response, serializedResponse;
let response;
if (initializationError) {
response = initializationError;
} else {
Expand All @@ -75,21 +70,12 @@ function releaseLock() {
}
}

try {
serializedResponse = serialize(response);
if (serializedResponse.byteLength > data.length) {
throw new ERR_OUT_OF_RANGE('serializedResponse.byteLength', `<= ${data.length}`, serializedResponse.byteLength);
}
} catch (exception) {
serializedResponse = serialize(exception);
}

// Send the method response (or exception) to the main thread
try {
TypedArrayPrototypeSet(data, serializedResponse);
syncCommPort.postMessage(response);
} catch (exception) {
// Or send the exception thrown when trying to send the response
TypedArrayPrototypeSet(data, serialize(exception));
syncCommPort.postMessage(exception);
}
releaseLock();
}
Expand Down

0 comments on commit b97e08e

Please sign in to comment.