Skip to content

Commit

Permalink
use syncCommPort to exchange data too
Browse files Browse the repository at this point in the history
  • Loading branch information
targos authored and GeoffreyBooth committed Jan 9, 2023
1 parent fe9111e commit 88eac69
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 104 deletions.
72 changes: 10 additions & 62 deletions lib/internal/modules/esm/hooks.js
Expand Up @@ -10,8 +10,6 @@ const {
SafeSet,
StringPrototypeSlice,
StringPrototypeToUpperCase,
TypedArrayPrototypeSet,
TypedArrayPrototypeSlice,
globalThis,
} = primordials;

Expand All @@ -24,14 +22,11 @@ const {
ERR_INVALID_RETURN_VALUE,
} = require('internal/errors').codes;
const { URL } = require('internal/url');
const { receiveMessageOnPort } = require('worker_threads');
const {
isAnyArrayBuffer,
isArrayBufferView,
} = require('internal/util/types');
const {
deserialize,
serialize,
} = require('v8');
const {
validateObject,
validateString,
Expand Down Expand Up @@ -457,16 +452,6 @@ class HooksProxy {
*/
#syncCommPort;

#done;
#chunkLength;

/**
* The request & response segment of the shared memory. TextEncoder/Decoder (needed to convert
* requests & responses into a format supported by the comms channel) reads and writes with
* Uint8Array.
*/
#data;

#isReady = false;

/**
Expand All @@ -478,13 +463,7 @@ class HooksProxy {
const { MessageChannel } = require('internal/worker/io');

const lock = new SharedArrayBuffer(4); // Signal to tell the other thread to sleep or wake
const done = new SharedArrayBuffer(1); // For chunking, to know whether the last chunk has been sent
const chunkLength = new SharedArrayBuffer(32); // For chunking, to know the length of the current chunk
const data = new SharedArrayBuffer(2048); // The data for the request and response
this.#lock = new Int32Array(lock);
this.#done = new Uint8Array(done);
this.#chunkLength = new Uint8Array(chunkLength);
this.#data = new Uint8Array(data);
const syncCommChannel = new MessageChannel();
this.#syncCommPort = syncCommChannel.port1;

Expand All @@ -495,9 +474,6 @@ class HooksProxy {
trackUnmanagedFds: false,
workerData: {
lock,
done,
chunkLength,
data,
syncCommPort: syncCommChannel.port2,
},
transferList: [syncCommChannel.port2],
Expand All @@ -515,50 +491,22 @@ class HooksProxy {
this.#isReady = true;
}

const request = serialize({ method, args });
TypedArrayPrototypeSet(this.#data, request);
TypedArrayPrototypeSet(this.#chunkLength, serialize(request.byteLength));
TypedArrayPrototypeSet(this.#done, [1]);

const chunks = [];
let done = false;
while (done === false) {
this.#awaitResponse();
// Pass work to the worker.
this.#syncCommPort.postMessage({ method, args });

try {
var chunkLength = deserialize(this.#chunkLength);
} catch (err) {
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
}
if (!chunkLength) { throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); }

const chunk = TypedArrayPrototypeSlice(this.#data, 0, chunkLength);
if (!chunk) { throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined); }

ArrayPrototypePush(chunks, chunk);
if (this.#done[0] === 1) { done = true; }
}
if (chunks.length === 0) { // Response should not be empty
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
}
const reassembledChunks = Buffer.concat(chunks);
const response = deserialize(reassembledChunks);
Atomics.store(this.#lock, 0, 0); // Reset lock.
Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds.

const response = receiveMessageOnPort(this.#syncCommPort).message;
if (response instanceof Error) {
// An exception was thrown in the worker thread; re-throw to crash the process
const { triggerUncaughtException } = internalBinding('errors');
triggerUncaughtException(response);
} else if (!response) {
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
} else {
return response;
}

return response;
}

#awaitResponse() {
// Signal the worker that there is work to do.
this.#syncCommPort.postMessage(true);

Atomics.store(this.#lock, 0, 0); // Reset lock.
Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds.
}
}
ObjectSetPrototypeOf(HooksProxy.prototype, null);
Expand Down
72 changes: 30 additions & 42 deletions lib/internal/modules/esm/worker.js
Expand Up @@ -4,9 +4,6 @@ const {
Int32Array,
ReflectApply,
SafeWeakMap,
TypedArrayPrototypeSet,
TypedArrayPrototypeSlice,
Uint8Array,
globalThis: {
Atomics,
},
Expand All @@ -24,9 +21,6 @@ const { workerData } = require('worker_threads');
// lock = 0 -> main sleeps
// lock = 1 -> worker sleeps
const lock = new Int32Array(workerData.lock); // Required by Atomics
const done = new Uint8Array(workerData.done); // Coordinate chunks between main and worker
const chunkLength = new Uint8Array(workerData.chunkLength); // Coordinate chunks between main and worker
const data = new Uint8Array(workerData.data); // Chunks content
const { syncCommPort } = workerData; // To receive work signals.

function releaseLock() {
Expand All @@ -52,54 +46,48 @@ function releaseLock() {

syncCommPort.on('message', handleSyncMessage);

// ! Put as little above this line as possible
releaseLock(); // Send 'ready' signal to main
// ! Put as little above this line as possible.
releaseLock(); // Send 'ready' signal to main.

// Preserve state across iterations of the loop so that we can return responses in chunks
let serializedResponse, chunksCount, chunksSent = 0;
async function handleSyncMessage() {
async function handleSyncMessage({ method, args }) {
// Each potential exception needs to be caught individually so that the correct error is sent to the main thread.
let response;
if (initializationError) {
serializedResponse = serialize(initializationError);
chunksCount = 1;
} else if (done[0] !== 0) { // Not currently sending chunks to main thread; process new request
const requestLength = deserialize(chunkLength);
const { method, args } = deserialize(data.slice(0, requestLength));
response = initializationError;
} else {
if (!hooks[method]) {
throw new ERR_INVALID_ARG_VALUE('method', method);
}

const response = await ReflectApply(hooks[method], hooks, args);
if (!response) {
throw new ERR_INVALID_RETURN_VALUE('object', method, response)
try {
response = await ReflectApply(hooks[method], hooks, args);
if (!response) {
throw new ERR_INVALID_RETURN_VALUE('object', method, response);
}
} catch (exception) {
response = exception;
}

serializedResponse = serialize(response);
chunksCount = Math.ceil(serializedResponse.byteLength / data.length);
chunksSent = 0;
}

const startIndex = chunksSent * data.length;
const endIndex = startIndex + data.length;
const chunk = TypedArrayPrototypeSlice(serializedResponse, startIndex, endIndex);
const isLastChunk = chunksSent === chunksCount - 1;
TypedArrayPrototypeSet(data, chunk);
TypedArrayPrototypeSet(chunkLength, serialize(chunk.byteLength));
TypedArrayPrototypeSet(done, isLastChunk ? [1] : [0]);
if (isLastChunk) {
serializedResponse = undefined;
chunksCount = undefined;
chunksSent = 0;
} else {
chunksSent++;
// Send the method response (or exception) to the main thread.
try {
syncCommPort.postMessage(response);
} catch (exception) {
// Or send the exception thrown when trying to send the response.
syncCommPort.postMessage(exception);
}
releaseLock();
}
})().catch((exception) => {
})().catch(exception => {
// Send the exception up to the main thread so it can throw it and crash the process
process._rawDebug('exception in worker:', exception)
const chunk = serialize(exception);
TypedArrayPrototypeSet(data, chunk);
TypedArrayPrototypeSet(chunkLength, serialize(chunk.byteLength));
TypedArrayPrototypeSet(done, [1]);
process._rawDebug('exception in worker:', exception) // TODO: Remove this once exception handling is reliable
syncCommPort.postMessage(exception);
releaseLock();
});

process.on('uncaughtException', (err) => {
process._rawDebug('process uncaughtException:', err);
const { triggerUncaughtException } = internalBinding('errors');
releaseLock();
triggerUncaughtException(err);
});

0 comments on commit 88eac69

Please sign in to comment.