Skip to content

Commit f820ce6

Browse files
addaleaxcodebytere
authored andcommittedMar 30, 2020
doc: add AsyncResource + Worker pool example
Use Worker thread pools as an example of how `AsyncResource` can be used to track async state across callbacks. PR-URL: #31601 Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Chengzhong Wu <legendecas@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Anto Aravinth <anto.aravinth.cse@gmail.com>
1 parent e2c40cc commit f820ce6

File tree

2 files changed

+126
-1
lines changed

2 files changed

+126
-1
lines changed
 

‎doc/api/async_hooks.md

+122
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,128 @@ never be called.
681681
* Returns: {number} The same `triggerAsyncId` that is passed to the
682682
`AsyncResource` constructor.
683683

684+
<a id="async-resource-worker-pool"></a>
685+
### Using `AsyncResource` for a `Worker` thread pool
686+
687+
The following example shows how to use the `AsyncResource` class to properly
688+
provide async tracking for a [`Worker`][] pool. Other resource pools, such as
689+
database connection pools, can follow a similar model.
690+
691+
Assuming that the task is adding two numbers, using a file named
692+
`task_processor.js` with the following content:
693+
694+
```js
695+
const { parentPort } = require('worker_threads');
696+
parentPort.on('message', (task) => {
697+
parentPort.postMessage(task.a + task.b);
698+
});
699+
```
700+
701+
a Worker pool around it could use the following structure:
702+
703+
```js
704+
const { AsyncResource } = require('async_hooks');
705+
const { EventEmitter } = require('events');
706+
const path = require('path');
707+
const { Worker } = require('worker_threads');
708+
709+
const kTaskInfo = Symbol('kTaskInfo');
710+
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
711+
712+
class WorkerPoolTaskInfo extends AsyncResource {
713+
constructor(callback) {
714+
super('WorkerPoolTaskInfo');
715+
this.callback = callback;
716+
}
717+
718+
done(err, result) {
719+
this.runInAsyncScope(this.callback, null, err, result);
720+
this.emitDestroy(); // `TaskInfo`s are used only once.
721+
}
722+
}
723+
724+
class WorkerPool extends EventEmitter {
725+
constructor(numThreads) {
726+
super();
727+
this.numThreads = numThreads;
728+
this.workers = [];
729+
this.freeWorkers = [];
730+
731+
for (let i = 0; i < numThreads; i++)
732+
this.addNewWorker();
733+
}
734+
735+
addNewWorker() {
736+
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
737+
worker.on('message', (result) => {
738+
// In case of success: Call the callback that was passed to `runTask`,
739+
// remove the `TaskInfo` associated with the Worker, and mark it as free
740+
// again.
741+
worker[kTaskInfo].done(null, result);
742+
worker[kTaskInfo] = null;
743+
this.freeWorkers.push(worker);
744+
this.emit(kWorkerFreedEvent);
745+
});
746+
worker.on('error', (err) => {
747+
// In case of an uncaught exception: Call the callback that was passed to
748+
// `runTask` with the error.
749+
if (worker[kTaskInfo])
750+
worker[kTaskInfo].done(err, null);
751+
else
752+
this.emit('error', err);
753+
// Remove the worker from the list and start a new Worker to replace the
754+
// current one.
755+
this.workers.splice(this.workers.indexOf(worker), 1);
756+
this.addNewWorker();
757+
});
758+
this.workers.push(worker);
759+
this.freeWorkers.push(worker);
760+
}
761+
762+
runTask(task, callback) {
763+
if (this.freeWorkers.length === 0) {
764+
// No free threads, wait until a worker thread becomes free.
765+
this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
766+
return;
767+
}
768+
769+
const worker = this.freeWorkers.pop();
770+
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
771+
worker.postMessage(task);
772+
}
773+
774+
close() {
775+
for (const worker of this.workers) worker.terminate();
776+
}
777+
}
778+
779+
module.exports = WorkerPool;
780+
```
781+
782+
Without the explicit tracking added by the `WorkerPoolTaskInfo` objects,
783+
it would appear that the callbacks are associated with the individual `Worker`
784+
objects. However, the creation of the `Worker`s is not associated with the
785+
creation of the tasks and does not provide information about when tasks
786+
were scheduled.
787+
788+
This pool could be used as follows:
789+
790+
```js
791+
const WorkerPool = require('./worker_pool.js');
792+
const os = require('os');
793+
794+
const pool = new WorkerPool(os.cpus().length);
795+
796+
let finished = 0;
797+
for (let i = 0; i < 10; i++) {
798+
pool.runTask({ a: 42, b: 100 }, (err, result) => {
799+
console.log(i, err, result);
800+
if (++finished === 10)
801+
pool.close();
802+
});
803+
}
804+
```
805+
684806
[`after` callback]: #async_hooks_after_asyncid
685807
[`before` callback]: #async_hooks_before_asyncid
686808
[`destroy` callback]: #async_hooks_destroy_asyncid

‎doc/api/worker_threads.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ overhead of creating Workers would likely exceed their benefit.
5151

5252
When implementing a worker pool, use the [`AsyncResource`][] API to inform
5353
diagnostic tools (e.g. in order to provide asynchronous stack traces) about the
54-
correlation between tasks and their outcomes.
54+
correlation between tasks and their outcomes. See
55+
["Using `AsyncResource` for a `Worker` thread pool"][async-resource-worker-pool]
56+
in the `async_hooks` documentation for an example implementation.
5557

5658
Worker threads inherit non-process-specific options by default. Refer to
5759
[`Worker constructor options`][] to know how to customize worker thread options,
@@ -759,6 +761,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
759761
[`worker.terminate()`]: #worker_threads_worker_terminate
760762
[`worker.threadId`]: #worker_threads_worker_threadid_1
761763
[Addons worker support]: addons.html#addons_worker_support
764+
[async-resource-worker-pool]: async_hooks.html#async-resource-worker-pool
762765
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
763766
[Signals events]: process.html#process_signal_events
764767
[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API

0 commit comments

Comments
 (0)
Please sign in to comment.