@@ -682,6 +682,128 @@ never be called.
682682* Returns: {number} The same ` triggerAsyncId ` that is passed to the
683683` AsyncResource ` constructor.
684684
685+ <a id =" async-resource-worker-pool " ></a >
686+ ### Using ` AsyncResource ` for a ` Worker ` thread pool
687+
688+ The following example shows how to use the ` AsyncResource ` class to properly
689+ provide async tracking for a [ ` Worker ` ] [ ] pool. Other resource pools, such as
690+ database connection pools, can follow a similar model.
691+
692+ Assuming that the task is adding two numbers, using a file named
693+ ` task_processor.js ` with the following content:
694+
695+ ``` js
696+ const { parentPort } = require (' worker_threads' );
697+ parentPort .on (' message' , (task ) => {
698+ parentPort .postMessage (task .a + task .b );
699+ });
700+ ```
701+
702+ a Worker pool around it could use the following structure:
703+
704+ ``` js
705+ const { AsyncResource } = require (' async_hooks' );
706+ const { EventEmitter } = require (' events' );
707+ const path = require (' path' );
708+ const { Worker } = require (' worker_threads' );
709+
710+ const kTaskInfo = Symbol (' kTaskInfo' );
711+ const kWorkerFreedEvent = Symbol (' kWorkerFreedEvent' );
712+
713+ class WorkerPoolTaskInfo extends AsyncResource {
714+ constructor (callback ) {
715+ super (' WorkerPoolTaskInfo' );
716+ this .callback = callback;
717+ }
718+
719+ done (err , result ) {
720+ this .runInAsyncScope (this .callback , null , err, result);
721+ this .emitDestroy (); // `TaskInfo`s are used only once.
722+ }
723+ }
724+
725+ class WorkerPool extends EventEmitter {
726+ constructor (numThreads ) {
727+ super ();
728+ this .numThreads = numThreads;
729+ this .workers = [];
730+ this .freeWorkers = [];
731+
732+ for (let i = 0 ; i < numThreads; i++ )
733+ this .addNewWorker ();
734+ }
735+
736+ addNewWorker () {
737+ const worker = new Worker (path .resolve (__dirname , ' task_processor.js' ));
738+ worker .on (' message' , (result ) => {
739+ // In case of success: Call the callback that was passed to `runTask`,
740+ // remove the `TaskInfo` associated with the Worker, and mark it as free
741+ // again.
742+ worker[kTaskInfo].done (null , result);
743+ worker[kTaskInfo] = null ;
744+ this .freeWorkers .push (worker);
745+ this .emit (kWorkerFreedEvent);
746+ });
747+ worker .on (' error' , (err ) => {
748+ // In case of an uncaught exception: Call the callback that was passed to
749+ // `runTask` with the error.
750+ if (worker[kTaskInfo])
751+ worker[kTaskInfo].done (err, null );
752+ else
753+ this .emit (' error' , err);
754+ // Remove the worker from the list and start a new Worker to replace the
755+ // current one.
756+ this .workers .splice (this .workers .indexOf (worker), 1 );
757+ this .addNewWorker ();
758+ });
759+ this .workers .push (worker);
760+ this .freeWorkers .push (worker);
761+ }
762+
763+ runTask (task , callback ) {
764+ if (this .freeWorkers .length === 0 ) {
765+ // No free threads, wait until a worker thread becomes free.
766+ this .once (kWorkerFreedEvent, () => this .runTask (task, callback));
767+ return ;
768+ }
769+
770+ const worker = this .freeWorkers .pop ();
771+ worker[kTaskInfo] = new WorkerPoolTaskInfo (callback);
772+ worker .postMessage (task);
773+ }
774+
775+ close () {
776+ for (const worker of this .workers ) worker .terminate ();
777+ }
778+ }
779+
780+ module .exports = WorkerPool;
781+ ```
782+
783+ Without the explicit tracking added by the ` WorkerPoolTaskInfo ` objects,
784+ it would appear that the callbacks are associated with the individual ` Worker `
785+ objects. However, the creation of the ` Worker ` s is not associated with the
786+ creation of the tasks and does not provide information about when tasks
787+ were scheduled.
788+
789+ This pool could be used as follows:
790+
791+ ``` js
792+ const WorkerPool = require (' ./worker_pool.js' );
793+ const os = require (' os' );
794+
795+ const pool = new WorkerPool (os .cpus ().length );
796+
797+ let finished = 0 ;
798+ for (let i = 0 ; i < 10 ; i++ ) {
799+ pool .runTask ({ a: 42 , b: 100 }, (err , result ) => {
800+ console .log (i, err, result);
801+ if (++ finished === 10 )
802+ pool .close ();
803+ });
804+ }
805+ ```
806+
685807[ `after` callback ] : #async_hooks_after_asyncid
686808[ `before` callback ] : #async_hooks_before_asyncid
687809[ `destroy` callback ] : #async_hooks_destroy_asyncid
0 commit comments