@@ -681,6 +681,128 @@ never be called.
681
681
* Returns: {number} The same ` triggerAsyncId ` that is passed to the
682
682
` AsyncResource ` constructor.
683
683
684
+ <a id =" async-resource-worker-pool " ></a >
685
+ ### Using ` AsyncResource ` for a ` Worker ` thread pool
686
+
687
+ The following example shows how to use the ` AsyncResource ` class to properly
688
+ provide async tracking for a [ ` Worker ` ] [ ] pool. Other resource pools, such as
689
+ database connection pools, can follow a similar model.
690
+
691
+ Assuming that the task is adding two numbers, using a file named
692
+ ` task_processor.js ` with the following content:
693
+
694
+ ``` js
695
+ const { parentPort } = require (' worker_threads' );
696
+ parentPort .on (' message' , (task ) => {
697
+ parentPort .postMessage (task .a + task .b );
698
+ });
699
+ ```
700
+
701
+ a Worker pool around it could use the following structure:
702
+
703
+ ``` js
704
+ const { AsyncResource } = require (' async_hooks' );
705
+ const { EventEmitter } = require (' events' );
706
+ const path = require (' path' );
707
+ const { Worker } = require (' worker_threads' );
708
+
709
+ const kTaskInfo = Symbol (' kTaskInfo' );
710
+ const kWorkerFreedEvent = Symbol (' kWorkerFreedEvent' );
711
+
712
+ class WorkerPoolTaskInfo extends AsyncResource {
713
+ constructor (callback ) {
714
+ super (' WorkerPoolTaskInfo' );
715
+ this .callback = callback;
716
+ }
717
+
718
+ done (err , result ) {
719
+ this .runInAsyncScope (this .callback , null , err, result);
720
+ this .emitDestroy (); // `TaskInfo`s are used only once.
721
+ }
722
+ }
723
+
724
+ class WorkerPool extends EventEmitter {
725
+ constructor (numThreads ) {
726
+ super ();
727
+ this .numThreads = numThreads;
728
+ this .workers = [];
729
+ this .freeWorkers = [];
730
+
731
+ for (let i = 0 ; i < numThreads; i++ )
732
+ this .addNewWorker ();
733
+ }
734
+
735
+ addNewWorker () {
736
+ const worker = new Worker (path .resolve (__dirname , ' task_processor.js' ));
737
+ worker .on (' message' , (result ) => {
738
+ // In case of success: Call the callback that was passed to `runTask`,
739
+ // remove the `TaskInfo` associated with the Worker, and mark it as free
740
+ // again.
741
+ worker[kTaskInfo].done (null , result);
742
+ worker[kTaskInfo] = null ;
743
+ this .freeWorkers .push (worker);
744
+ this .emit (kWorkerFreedEvent);
745
+ });
746
+ worker .on (' error' , (err ) => {
747
+ // In case of an uncaught exception: Call the callback that was passed to
748
+ // `runTask` with the error.
749
+ if (worker[kTaskInfo])
750
+ worker[kTaskInfo].done (err, null );
751
+ else
752
+ this .emit (' error' , err);
753
+ // Remove the worker from the list and start a new Worker to replace the
754
+ // current one.
755
+ this .workers .splice (this .workers .indexOf (worker), 1 );
756
+ this .addNewWorker ();
757
+ });
758
+ this .workers .push (worker);
759
+ this .freeWorkers .push (worker);
760
+ }
761
+
762
+ runTask (task , callback ) {
763
+ if (this .freeWorkers .length === 0 ) {
764
+ // No free threads, wait until a worker thread becomes free.
765
+ this .once (kWorkerFreedEvent, () => this .runTask (task, callback));
766
+ return ;
767
+ }
768
+
769
+ const worker = this .freeWorkers .pop ();
770
+ worker[kTaskInfo] = new WorkerPoolTaskInfo (callback);
771
+ worker .postMessage (task);
772
+ }
773
+
774
+ close () {
775
+ for (const worker of this .workers ) worker .terminate ();
776
+ }
777
+ }
778
+
779
+ module .exports = WorkerPool;
780
+ ```
781
+
782
+ Without the explicit tracking added by the ` WorkerPoolTaskInfo ` objects,
783
+ it would appear that the callbacks are associated with the individual ` Worker `
784
+ objects. However, the creation of the ` Worker ` s is not associated with the
785
+ creation of the tasks and does not provide information about when tasks
786
+ were scheduled.
787
+
788
+ This pool could be used as follows:
789
+
790
+ ``` js
791
+ const WorkerPool = require (' ./worker_pool.js' );
792
+ const os = require (' os' );
793
+
794
+ const pool = new WorkerPool (os .cpus ().length );
795
+
796
+ let finished = 0 ;
797
+ for (let i = 0 ; i < 10 ; i++ ) {
798
+ pool .runTask ({ a: 42 , b: 100 }, (err , result ) => {
799
+ console .log (i, err, result);
800
+ if (++ finished === 10 )
801
+ pool .close ();
802
+ });
803
+ }
804
+ ```
805
+
684
806
[ `after` callback ] : #async_hooks_after_asyncid
685
807
[ `before` callback ] : #async_hooks_before_asyncid
686
808
[ `destroy` callback ] : #async_hooks_destroy_asyncid
0 commit comments