Skip to content

Commit

Permalink
doc: add AsyncResource + Worker pool example
Browse files Browse the repository at this point in the history
Use Worker thread pools as an example of how `AsyncResource`
can be used to track async state across callbacks.

PR-URL: #31601
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Chengzhong Wu <legendecas@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: Anto Aravinth <anto.aravinth.cse@gmail.com>
  • Loading branch information
addaleax authored and codebytere committed Mar 17, 2020
1 parent 53305de commit c21861a
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 1 deletion.
122 changes: 122 additions & 0 deletions doc/api/async_hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,128 @@ never be called.
* Returns: {number} The same `triggerAsyncId` that is passed to the
`AsyncResource` constructor.

<a id="async-resource-worker-pool"></a>
### Using `AsyncResource` for a `Worker` thread pool

The following example shows how to use the `AsyncResource` class to properly
provide async tracking for a [`Worker`][] pool. Other resource pools, such as
database connection pools, can follow a similar model.

Assuming that the task is adding two numbers, using a file named
`task_processor.js` with the following content:

```js
const { parentPort } = require('worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
```

a Worker pool around it could use the following structure:

```js
const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
const path = require('path');
const { Worker } = require('worker_threads');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}

done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}

class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];

for (let i = 0; i < numThreads; i++)
this.addNewWorker();
}

addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
}

runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
return;
}

const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}

close() {
for (const worker of this.workers) worker.terminate();
}
}

module.exports = WorkerPool;
```

Without the explicit tracking added by the `WorkerPoolTaskInfo` objects,
it would appear that the callbacks are associated with the individual `Worker`
objects. However, the creation of the `Worker`s is not associated with the
creation of the tasks and does not provide information about when tasks
were scheduled.

This pool could be used as follows:

```js
const WorkerPool = require('./worker_pool.js');
const os = require('os');

const pool = new WorkerPool(os.cpus().length);

let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
```

[`after` callback]: #async_hooks_after_asyncid
[`before` callback]: #async_hooks_before_asyncid
[`destroy` callback]: #async_hooks_destroy_asyncid
Expand Down
5 changes: 4 additions & 1 deletion doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ overhead of creating Workers would likely exceed their benefit.

When implementing a worker pool, use the [`AsyncResource`][] API to inform
diagnostic tools (e.g. in order to provide asynchronous stack traces) about the
correlation between tasks and their outcomes.
correlation between tasks and their outcomes. See
["Using `AsyncResource` for a `Worker` thread pool"][async-resource-worker-pool]
in the `async_hooks` documentation for an example implementation.

Worker threads inherit non-process-specific options by default. Refer to
[`Worker constructor options`][] to know how to customize worker thread options,
Expand Down Expand Up @@ -759,6 +761,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`worker.terminate()`]: #worker_threads_worker_terminate
[`worker.threadId`]: #worker_threads_worker_threadid_1
[Addons worker support]: addons.html#addons_worker_support
[async-resource-worker-pool]: async_hooks.html#async-resource-worker-pool
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
[Signals events]: process.html#process_signal_events
[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API
Expand Down

0 comments on commit c21861a

Please sign in to comment.