Skip to content

Commit

Permalink
unlock worker thread
Browse files Browse the repository at this point in the history
use postMessage from the main thread instead of atomics to signal the worker
  • Loading branch information
targos committed Jan 8, 2023
1 parent 260091f commit d3af446
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
19 changes: 15 additions & 4 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,15 @@ class HooksProxy {
#data;
/**
* The lock/unlock segment of the shared memory. Atomics require this to be a Int32Array. This
* segment is used to tell the main to sleep when the worker is processing, and vice versa
* (for the worker to sleep whilst the main thread is processing).
* segment is used to tell the main to sleep when the worker is processing.
* 0 -> main sleeps
* 1 -> worker sleeps
* 1 -> main wakes up
*/
#lock;
/**
* A MessagePort used to synchronously communicate with the worker.
*/
#syncCommPort;

#isReady = false;

Expand All @@ -468,11 +471,14 @@ class HooksProxy {
*/
constructor() {
const { InternalWorker } = require('internal/worker');
const { MessageChannel } = require('internal/worker/io');

const data = new SharedArrayBuffer(204_800);
const lock = new SharedArrayBuffer(4);
this.#data = new Uint8Array(data);
this.#lock = new Int32Array(lock);
const syncCommChannel = new MessageChannel();
this.#syncCommPort = syncCommChannel.port1;

const worker = this.worker = new InternalWorker('internal/modules/esm/worker', {
stderr: false,
Expand All @@ -482,9 +488,11 @@ class HooksProxy {
workerData: {
data,
lock,
syncCommPort: syncCommChannel.port2,
},
transferList: [syncCommChannel.port2],
});
worker.unref(); // ! Allows the process to eventually exit when worker is in its final sleep.
worker.unref(); // ! Allows the process to eventually exit.
}

makeRequest(method, ...args) {
Expand All @@ -501,6 +509,9 @@ class HooksProxy {
const request = serialize({ method, args });
TypedArrayPrototypeSet(this.#data, request);

// Signal the worker that there is work to do.
this.#syncCommPort.postMessage(true);

const chunks = [];
let done = false;
try {
Expand Down
7 changes: 4 additions & 3 deletions lib/internal/modules/esm/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const { workerData } = require('worker_threads');
// lock = 1 -> worker sleeps
const lock = new Int32Array(workerData.lock); // Required by Atomics
const data = new Uint8Array(workerData.data); // For v8.deserialize/serialize
const { syncCommPort } = workerData; // To receive work signals.
const CHUNK_THRESHOLD = data.length - 26; // `{ done: true/false, value: … }` adds 26 bytes

function releaseLock() {
Expand All @@ -46,14 +47,14 @@ function releaseLock() {
initializationError = exception;
}

syncCommPort.on('message', handleSyncMessage);

// ! Put as little above this line as possible
releaseLock(); // Send 'ready' signal to main

const { deserialize, serialize } = require('v8');

while (true) { // The loop is needed in order to cycle through requests
Atomics.wait(lock, 0, 1); // This pauses the while loop

async function handleSyncMessage() {
const { method, args } = deserialize(data);
TypedArrayPrototypeFill(data, undefined);

Expand Down

0 comments on commit d3af446

Please sign in to comment.