Skip to content

Commit

Permalink
Implement Piscina.move
Browse files Browse the repository at this point in the history
Fixes: #58
  • Loading branch information
jasnell committed May 16, 2020
1 parent 57dc571 commit f36d3af
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 5 deletions.
73 changes: 73 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,79 @@ Is `true` if this code runs inside a `Piscina` threadpool as a Worker.

Provides the current version of this library as a semver string.

### Static method: `move(value)`

By default, any value returned by a worker function will be cloned when
returned back to the Piscina pool, even if that object is capable of
being transfered. The `Piscina.move()` method can be used to wrap and
mark transferable values such that they will by transfered rather than
cloned.

The `value` may be any object supported by Node.js to be transferable
(e.g. `ArrayBuffer`, any `TypedArray`, or `MessagePort`), or any object
implementing the `Transferable` interface.

```js
const { move } = require('piscina');

module.exports = () => {
return move(new ArrayBuffer(10));
}
```
The `move()` method will throw if the `value` is not transferable.
The object returned by the `move()` method should not be set as a
nested value in an object. If it is used, the `move()` object itself
will be cloned as opposed to transfering the object it wraps.
#### Interface: `Transferable`
Objects may implement the `Transferable` interface to create their own
custom transferable objects. This is useful when an object being
passed into or from a worker contains a deeply nested transferable
object such as an `ArrayBuffer` or `MessagePort`.
`Transferable` objects expose two properties inspected by Piscina
to determine how to transfer the object. These properties are
named using the special static `Piscina.transferableSymbol` and
`Piscina.valueSymbol` properties:
* The `Piscina.transferableSymbol` property provides the object
(or objects) that are to be included in the `transferList`.
* The `Piscina.valueSymbol` property provides a surrogate value
to transmit in place of the `Transferable` itself.
Both properties are required.
For example,
```js
const {
move,
transferableSymbol,
valueSymbol
} = require('piscina');

module.exports = () => {
const obj = {
a: { b: new Uint8Array(5); },
c: { new Uint8Array(10); },

get [transferableSymbol]() {
// Transfer the two underlying ArrayBuffers
return [this.a.b.buffer, this.c.buffer];
}

get [valueSymbol]() {
return { a: { b: this.b }, c: this.c };
}
};
return move(obj);
};
```
## Current Limitations (Things we're working on / would love help with)
* Improved Documentation
Expand Down
15 changes: 15 additions & 0 deletions examples/move/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const Piscina = require('../..');
const { resolve } = require('path');

const pool = new Piscina({
filename: resolve(__dirname, 'worker.js'),
idleTimeout: 1000
});

(async () => {
// The task will transfer an ArrayBuffer
// back to the main thread rather than
// cloning it.
const u8 = await pool.runTask();
console.log(u8.length);
})();
7 changes: 7 additions & 0 deletions examples/move/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const { move } = require('../..');

module.exports = () => {
// Using move causes the Uint8Array to be
// transferred rather than cloned.
return move(new Uint8Array(10));
};
34 changes: 34 additions & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,40 @@ export const commonState = {
workerData: undefined
};

// Internal symbol used to mark Transferable objects returned
// by the Piscina.move() function
const kMovable = Symbol('Piscina.kMovable');
export const kTransferable = Symbol.for('Piscina.transferable');
export const kValue = Symbol.for('Piscina.valueOf');

// True if the object implements the Transferable interface
export function isTransferable (value : any) : boolean {
return value != null &&
typeof value === 'object' &&
kTransferable in value &&
kValue in value;
}

// True if object implements Transferable and has been returned
// by the Piscina.move() function
export function isMovable (value : any) : boolean {
return isTransferable(value) && value[kMovable] === true;
}

export function markMovable (value : object) : void {
Object.defineProperty(value, kMovable, {
enumerable: false,
configurable: true,
writable: true,
value: true
});
}

export interface Transferable {
readonly [kTransferable] : object;
readonly [kValue] : object;
}

export const kRequestCountField = 0;
export const kResponseCountField = 1;
export const kFieldCount = 2;
74 changes: 72 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,27 @@ import { AsyncResource } from 'async_hooks';
import { cpus } from 'os';
import { fileURLToPath, URL } from 'url';
import { resolve } from 'path';
import { inspect } from 'util';
import { inspect, types } from 'util';
import assert from 'assert';
import { Histogram, build } from 'hdr-histogram-js';
import { performance } from 'perf_hooks';
import hdrobj from 'hdr-histogram-percentiles-obj';
import { ReadyMessage, RequestMessage, ResponseMessage, StartupMessage, commonState, kResponseCountField, kRequestCountField, kFieldCount } from './common';
import {
ReadyMessage,
RequestMessage,
ResponseMessage,
StartupMessage,
commonState,
kResponseCountField,
kRequestCountField,
kFieldCount,
Transferable,
isTransferable,
markMovable,
isMovable,
kTransferable,
kValue
} from './common';
import { version } from '../package.json';

const cpuCount : number = (() => {
Expand Down Expand Up @@ -49,6 +64,8 @@ type EnvSpecifier = typeof Worker extends {
new (filename : never, options?: { env: infer T }) : Worker;
} ? T : never;

type TransferListItem = TransferList extends (infer T)[] ? T : never;

interface Options {
filename? : string | null,
minThreads? : number,
Expand Down Expand Up @@ -84,6 +101,28 @@ const kDefaultOptions : FilledOptions = {
useAtomics: true
};

class DirectlyTransferable implements Transferable {
#value : object;
constructor (value : object) {
this.#value = value;
}

get [kTransferable] () : object { return this.#value; }

get [kValue] () : object { return this.#value; }
}

class ArrayBufferViewTransferable implements Transferable {
#view : ArrayBufferView;
constructor (view : ArrayBufferView) {
this.#view = view;
}

get [kTransferable] () : object { return this.#view.buffer; }

get [kValue] () : object { return this.#view; }
}

let taskIdCounter = 0;

type TaskCallback = (err : Error, result: any) => void;
Expand Down Expand Up @@ -121,6 +160,19 @@ class TaskInfo extends AsyncResource {
this.callback = callback;
this.task = task;
this.transferList = transferList;

// If the task is a Transferable returned by
// Piscina.move(), then add it to the transferList
// automatically
if (isMovable(task)) {
if (this.transferList === undefined) {
this.transferList = [];
}
this.transferList =
this.transferList.concat(task[kTransferable]);
this.task = task[kValue];
}

this.filename = filename;
this.taskId = taskIdCounter++;
this.abortSignal = abortSignal;
Expand Down Expand Up @@ -860,6 +912,24 @@ class Piscina extends EventEmitterAsyncResource {
static get Piscina () {
return Piscina;
}

static move (val : Transferable | TransferListItem | ArrayBufferView) {
if (val != null && typeof val === 'object' && typeof val !== 'function') {
if (!isTransferable(val)) {
if ((types as any).isArrayBufferView(val)) {
val = new ArrayBufferViewTransferable(val as ArrayBufferView);
} else {
val = new DirectlyTransferable(val);
}
}
markMovable(val);
}
return val;
}

static get transferableSymbol () { return kTransferable; }

static get valueSymbol () { return kValue; }
}

export = Piscina;
22 changes: 19 additions & 3 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import { parentPort, MessagePort, receiveMessageOnPort, workerData } from 'worker_threads';
import { pathToFileURL } from 'url';
import { commonState, ReadyMessage, RequestMessage, ResponseMessage, StartupMessage, kResponseCountField, kRequestCountField } from './common';
import {
commonState,
ReadyMessage,
RequestMessage,
ResponseMessage,
StartupMessage,
kResponseCountField,
kRequestCountField,
isMovable,
kTransferable,
kValue
} from './common';

commonState.isWorkerThread = true;
commonState.workerData = workerData;
Expand Down Expand Up @@ -112,12 +123,17 @@ function onMessage (

(async function () {
let response : ResponseMessage;
const transferList : any[] = [];
try {
const handler = await getHandler(filename);
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`);
}
const result = await handler(task);
let result = await handler(task);
if (isMovable(result)) {
transferList.concat(result[kTransferable]);
result = result[kValue];
}
response = {
taskId,
result: result,
Expand All @@ -137,7 +153,7 @@ function onMessage (
// Post the response to the parent thread, and let it know that we have
// an additional message available. If possible, use Atomics.wait()
// to wait for the next message.
port.postMessage(response);
port.postMessage(response, transferList);
Atomics.add(sharedBuffer, kResponseCountField, 1);
atomicsWaitLoop(port, sharedBuffer);
})().catch(throwInNextTick);
Expand Down
10 changes: 10 additions & 0 deletions test/fixtures/move.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import Piscina from '../..';
import assert from 'assert';
import { types } from 'util';

export default function (moved) {
if (moved !== undefined) {
assert(types.isAnyArrayBuffer(moved));
}
return Piscina.move(new ArrayBuffer(10));
}

0 comments on commit f36d3af

Please sign in to comment.