Skip to content

Commit

Permalink
chunks working?? (code is very rough!)
Browse files Browse the repository at this point in the history
  • Loading branch information
JakobJingleheimer authored and GeoffreyBooth committed Jan 6, 2023
1 parent c68a122 commit 84160b5
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 30 deletions.
50 changes: 36 additions & 14 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ class HooksProxy {
constructor() {
const { InternalWorker } = require('internal/worker');

const data = new SharedArrayBuffer(2048);
const data = new SharedArrayBuffer(204_800);
const lock = new SharedArrayBuffer(4);
this.#data = new Uint8Array(data);
this.#lock = new Int32Array(lock);
Expand Down Expand Up @@ -497,30 +497,52 @@ class HooksProxy {
this.#isReady = true;
}

TypedArrayPrototypeFill(this.#data, 0); // Erase handled request/response data

TypedArrayPrototypeFill(this.#data, undefined); // Erase handled request/response data
// console.log('sending request', method, args)
const request = serialize({ method, args });
TypedArrayPrototypeSet(this.#data, request);

Atomics.store(this.#lock, 0, 0); // Send request to worker
Atomics.notify(this.#lock, 0); // Notify worker of new request
Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds
// this.#awaitResponse();

let response;
const chunks = [];
let done = false;
try {
response = deserialize(this.#data);
while (done === false) {
this.#awaitResponse();

const chunk = deserialize(this.#data);
console.log('chunk:', chunk)

if (chunk instanceof Error) { throw chunk; }

({ done } = chunk);
chunks.push(chunk.value);

console.log('awaiting next')
}
} catch (exception) {
if (this.#data.every((byte) => byte === 0)) {
console.log('\nmakeRequest failed!\n');
console.log(this.#data);
console.log('byteLength:', this.#data.byteLength);
console.log('this.#data[0]:', this.#data[0]);
if (this.#data[0] === 0) { // Response should not be empty
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
} else {
throw exception;
}
}
if (response instanceof Error) {
throw response;
} else {
return response;
}

const response = Buffer.concat(chunks);

console.log('[HooksProxy]::makeRequest() finished:', deserialize(response));

return response;
}

#awaitResponse() {
Atomics.store(this.#lock, 0, 0); // Send request to worker
Atomics.notify(this.#lock, 0); // Notify worker of new request
Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds
}
}
ObjectSetPrototypeOf(HooksProxy.prototype, null);
Expand Down
50 changes: 34 additions & 16 deletions lib/internal/modules/esm/worker.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
'use strict';

process.on('uncaughtException', (err) => {
process._rawDebug('process uncaughtException:', new Error().stack);
const { triggerUncaughtException } = internalBinding('errors');
releaseLock();
triggerUncaughtException(err);
});

const {
Int32Array,
ReflectApply,
Expand All @@ -13,7 +20,6 @@ const {
} = primordials;
const {
ERR_INVALID_RETURN_VALUE,
ERR_OUT_OF_RANGE,
} = require('internal/errors').codes;

// Create this WeakMap in js-land because V8 has no C++ API for WeakMap.
Expand All @@ -26,6 +32,7 @@ if (isMainThread) { return; } // Needed to pass some tests that happen to load t
// lock = 1 -> worker sleeps
const lock = new Int32Array(workerData.lock); // Required by Atomics
const data = new Uint8Array(workerData.data); // For v8.deserialize/serialize
const CHUNK_THRESHOLD = data.length - 26; // `{ done: true/false, value: … }` adds 26 bytes

function releaseLock() {
Atomics.store(lock, 0, 1); // Send response to main
Expand All @@ -42,6 +49,7 @@ function releaseLock() {
try {
initializeESM();
hooks = await initializeHooks();
process._rawDebug('[WORKER]:', 'hooks:', hooks);
} catch (exception) {
// If there was an error while parsing and executing a user loader, for example if because a loader contained a syntax error,
// then we need to send the error to the main thread so it can be thrown and printed.
Expand All @@ -53,14 +61,15 @@ function releaseLock() {

const { deserialize, serialize } = require('v8');

while (true) {
while (true) { // The loop is needed in order to cycle through requests
Atomics.wait(lock, 0, 1); // This pauses the while loop

const { method, args } = deserialize(data);
TypedArrayPrototypeFill(data, 0);
process._rawDebug('[WORKER]:', 'request received:', method, args);
TypedArrayPrototypeFill(data, undefined);

// Each potential exception needs to be caught individually so that the correct error is sent to the main thread
let response, serializedResponse;
let response;
if (initializationError) {
response = initializationError;
} else {
Expand All @@ -70,27 +79,36 @@ function releaseLock() {
throw new ERR_INVALID_RETURN_VALUE('object', method, response)
}
} catch (exception) {
response = exception;
TypedArrayPrototypeSet(data, serialize(exception));
releaseLock();
}
}

try {
serializedResponse = serialize(response);
if (serializedResponse.byteLength > data.length) {
throw new ERR_OUT_OF_RANGE('serializedResponse.byteLength', `<= ${data.length}`, serializedResponse.byteLength);
process._rawDebug('[WORKER]:', 'response:', response);
const serializedResponseValue = serialize(response);
const chunkCount = Math.ceil(serializedResponseValue.byteLength / CHUNK_THRESHOLD);
process._rawDebug('[WORKER]:', 'chunkCount:', chunkCount)
for (let i = 0; i < chunkCount; i++) {
const chunk = {
__proto__: null,
done: i === chunkCount - 1,
value: serializedResponseValue.slice(i),
};
process._rawDebug('[WORKER]:', 'chunk:', chunk);
const serializedChunk = serialize(chunk);
process._rawDebug('[WORKER]:', 'serializedChunk:', serializedChunk);
// Send the method response (or exception) to the main thread

TypedArrayPrototypeSet(data, serializedChunk);
// process._rawDebug('[WORKER]:', 'chunk set; releasing lock');
releaseLock();
}
} catch (exception) {
serializedResponse = serialize(exception);
}

// Send the method response (or exception) to the main thread
try {
TypedArrayPrototypeSet(data, serializedResponse);
} catch (exception) {
// Or send the exception thrown when trying to send the response
TypedArrayPrototypeSet(data, serialize(exception));
releaseLock();
}
releaseLock();
}
})().catch((err) => {
// The triggerUncaughtException call below does not terminate the process or surface errors to the user;
Expand Down

0 comments on commit 84160b5

Please sign in to comment.