Skip to content

Commit 0caae95

Browse files
committed
worker: add postMessageToWorker
1 parent 11f4efc commit 0caae95

13 files changed

+665
-5
lines changed

doc/api/errors.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3104,6 +3104,42 @@ The `Worker` instance terminated because it reached its memory limit.
31043104
The path for the main script of a worker is neither an absolute path
31053105
nor a relative path starting with `./` or `../`.
31063106

3107+
<a id="ERR_WORKER_MESSAGING_ERRORED"></a>
3108+
3109+
### `ERR_WORKER_MESSAGING_ERRORED`
3110+
3111+
<!-- YAML
3112+
added: REPLACEME
3113+
-->
3114+
3115+
The destination thread threw an error while processing a message sent via [`postMessageToWorker()`][].
3116+
3117+
<a id="ERR_WORKER_MESSAGING_FAILED"></a>
3118+
3119+
### `ERR_WORKER_MESSAGING_FAILED`
3120+
3121+
<!-- YAML
3122+
added: REPLACEME
3123+
-->
3124+
3125+
The thread requested in [`postMessageToWorker()`][] is invalid or has no `workerMessage` listener.
3126+
3127+
<a id="ERR_WORKER_MESSAGING_SAME_THREAD"></a>
3128+
3129+
### `ERR_WORKER_MESSAGING_SAME_THREAD`
3130+
3131+
<!-- YAML
3132+
added: REPLACEME
3133+
-->
3134+
3135+
The thread id requested in [`postMessageToWorker()`][] is the current thread id.
3136+
3137+
<a id="ERR_WORKER_MESSAGING_TIMEOUT"></a>
3138+
3139+
### `ERR_WORKER_MESSAGING_TIMEOUT`
3140+
3141+
Sending a message via [`postMessageToWorker()`][] timed out.
3142+
31073143
<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>
31083144

31093145
### `ERR_WORKER_UNSERIALIZABLE_ERROR`
@@ -4027,6 +4063,7 @@ An error occurred trying to allocate memory. This should never happen.
40274063
[`new URLSearchParams(iterable)`]: url.md#new-urlsearchparamsiterable
40284064
[`package.json`]: packages.md#nodejs-packagejson-field-definitions
40294065
[`postMessage()`]: worker_threads.md#portpostmessagevalue-transferlist
4066+
[`postMessageToWorker()`]: worker_threads.md#workerpostmessagetoworkerdestination-value-transferlist-timeout
40304067
[`process.on('exit')`]: process.md#event-exit
40314068
[`process.send()`]: process.md#processsendmessage-sendhandle-options-callback
40324069
[`process.setUncaughtExceptionCaptureCallback()`]: process.md#processsetuncaughtexceptioncapturecallbackfn

doc/api/process.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,18 @@ possible to record such errors in an error log, either periodically (which is
327327
likely best for long-running application) or upon process exit (which is likely
328328
most convenient for scripts).
329329

330+
### Event: `'workerMessage'`
331+
332+
<!-- YAML
333+
added: REPLACEME
334+
-->
335+
336+
* `value` {any} A value transmitted using [`postMessageToWorker()`][].
337+
* `source` {number} The transmitting worker thread ID or `0` for the main thread.
338+
339+
The `'workerMessage'` event is emitted for any incoming message send by the other
340+
party by using [`postMessageToWorker()`][].
341+
330342
### Event: `'uncaughtException'`
331343

332344
<!-- YAML
@@ -4073,6 +4085,7 @@ cases:
40734085
[`net.Server`]: net.md#class-netserver
40744086
[`net.Socket`]: net.md#class-netsocket
40754087
[`os.constants.dlopen`]: os.md#dlopen-constants
4088+
[`postMessageToWorker()`]: worker_threads.md#workerpostmessagetoworkerdestination-value-transferlist-timeout
40764089
[`process.argv`]: #processargv
40774090
[`process.config`]: #processconfig
40784091
[`process.execPath`]: #processexecpath

doc/api/worker_threads.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,118 @@ if (isMainThread) {
252252
}
253253
```
254254

255+
## `worker.postMessageToWorker(destination, value[, transferList][, timeout])`
256+
257+
<!-- YAML
258+
added: REPLACEME
259+
-->
260+
261+
> Stability: 1.1 - Active development
262+
263+
* `destination` {number} The target thread ID. If the thread ID is invalid, a
264+
[`ERR_WORKER_MESSAGING_FAILED`][] error will be thrown. If the target thread ID is the current thread ID,
265+
a [`ERR_WORKER_MESSAGING_SAME_THREAD`][] error will be thrown.
266+
* `value` {any} The value to send.
267+
* `transferList` {Object\[]} If one or more `MessagePort`-like objects are passed in `value`,
268+
a `transferList` is required for those items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] is thrown.
269+
See [`port.postMessage()`][] for more information.
270+
* `timeout` {number} Time to wait for the message to be delivered in milliseconds.
271+
By default it's `undefined`, which means wait forever. If the operation times out,
272+
a [`ERR_WORKER_MESSAGING_TIMEOUT`][] error is thrown.
273+
* Returns: {Promise} A promise which is fulfilled if the message was successfully processed by destination thread.
274+
275+
Sends a value to another worker, identified by its thread ID.
276+
277+
If the target thread has no listener for the `workerMessage` event, then the operation will throw
278+
a [`ERR_WORKER_MESSAGING_FAILED`][] error.
279+
280+
If the target thread threw an error while processing the `workerMessage` event, then the operation will throw
281+
a [`ERR_WORKER_MESSAGING_ERRORED`][] error.
282+
283+
This method should be used when the target thread is not the direct
284+
parent or child of the current thread.
285+
If the two threads are parent-children, use the [`require('node:worker_threads').parentPort.postMessage()`][]
286+
and the [`worker.postMessage()`][] to let the threads communicate.
287+
288+
The example below shows the use of of `postMessageToWorker`: it creates 10 nested threads,
289+
the last one will try to communicate with the main thread.
290+
291+
```mjs
292+
import { fileURLToPath } from 'node:url';
293+
import { once } from 'node:events';
294+
import process from 'node:process';
295+
import {
296+
isMainThread,
297+
postMessageToWorker,
298+
threadId,
299+
workerData,
300+
Worker,
301+
} from 'node:worker_threads';
302+
303+
const channel = new BroadcastChannel('sync');
304+
const level = workerData?.level ?? 0;
305+
306+
if (level < 10) {
307+
const worker = new Worker(fileURLToPath(import.meta.url), {
308+
workerData: { level: level + 1 },
309+
});
310+
}
311+
312+
if (level === 0) {
313+
process.on('workerMessage', (value, source) => {
314+
console.log(`${source} -> ${threadId}:`, value);
315+
postMessageToWorker(source, { message: 'pong' });
316+
});
317+
} else if (level === 10) {
318+
process.on('workerMessage', (value, source) => {
319+
console.log(`${source} -> ${threadId}:`, value);
320+
channel.postMessage('done');
321+
channel.close();
322+
});
323+
324+
await postMessageToWorker(0, { message: 'ping' });
325+
}
326+
327+
channel.onmessage = channel.close;
328+
```
329+
330+
```cjs
331+
const { once } = require('node:events');
332+
const {
333+
isMainThread,
334+
postMessageToWorker,
335+
threadId,
336+
workerData,
337+
Worker,
338+
} = require('node:worker_threads');
339+
340+
const channel = new BroadcastChannel('sync');
341+
const level = workerData?.level ?? 0;
342+
343+
if (level < 10) {
344+
const worker = new Worker(__filename, {
345+
workerData: { level: level + 1 },
346+
});
347+
}
348+
349+
if (level === 0) {
350+
process.on('workerMessage', (value, source) => {
351+
console.log(`${source} -> ${threadId}:`, value);
352+
postMessageToWorker(source, { message: 'pong' });
353+
});
354+
} else if (level === 10) {
355+
process.on('workerMessage', (value, source) => {
356+
console.log(`${source} -> ${threadId}:`, value);
357+
channel.postMessage('done');
358+
channel.close();
359+
});
360+
361+
postMessageToWorker(0, { message: 'ping' });
362+
}
363+
364+
channel.onmessage = channel.close;
365+
```
366+
255367
## `worker.receiveMessageOnPort(port)`
256368
257369
<!-- YAML
@@ -1399,6 +1511,10 @@ thread spawned will spawn another until the application crashes.
13991511
[`Buffer.allocUnsafe()`]: buffer.md#static-method-bufferallocunsafesize
14001512
[`Buffer`]: buffer.md
14011513
[`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`]: errors.md#err_missing_message_port_in_transfer_list
1514+
[`ERR_WORKER_MESSAGING_ERRORED`]: errors.md#err_worker_messaging_errored
1515+
[`ERR_WORKER_MESSAGING_FAILED`]: errors.md#err_worker_messaging_failed
1516+
[`ERR_WORKER_MESSAGING_SAME_THREAD`]: errors.md#err_worker_messaging_same_thread
1517+
[`ERR_WORKER_MESSAGING_TIMEOUT`]: errors.md#err_worker_messaging_timeout
14021518
[`ERR_WORKER_NOT_RUNNING`]: errors.md#err_worker_not_running
14031519
[`EventTarget`]: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
14041520
[`FileHandle`]: fs.md#class-filehandle

lib/internal/errors.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,6 +1862,10 @@ E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
18621862
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
18631863
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
18641864
Error);
1865+
E('ERR_WORKER_MESSAGING_ERRORED', 'The destination thread threw an error while processing the message', Error);
1866+
E('ERR_WORKER_MESSAGING_FAILED', 'Cannot find the destination thread or listener', Error);
1867+
E('ERR_WORKER_MESSAGING_SAME_THREAD', 'Cannot sent a message to the same thread', Error);
1868+
E('ERR_WORKER_MESSAGING_TIMEOUT', 'Sending a message to another thread timed out', Error);
18651869
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
18661870
E('ERR_WORKER_OUT_OF_MEMORY',
18671871
'Worker terminated due to reaching memory limit: %s', Error);

lib/internal/main/worker_thread.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ const {
4444
kStdioWantsMoreDataCallback,
4545
} = workerIo;
4646

47+
const { setupMainThreadPort } = require('internal/worker/messaging');
48+
4749
const {
4850
onGlobalUncaughtException,
4951
} = require('internal/process/execution');
@@ -96,6 +98,7 @@ port.on('message', (message) => {
9698
hasStdin,
9799
publicPort,
98100
workerData,
101+
mainThreadPort,
99102
} = message;
100103

101104
if (doEval !== 'internal') {
@@ -109,6 +112,7 @@ port.on('message', (message) => {
109112
}
110113

111114
require('internal/worker').assignEnvironmentData(environmentData);
115+
setupMainThreadPort(mainThreadPort);
112116

113117
if (SharedArrayBuffer !== undefined) {
114118
// The counter is only passed to the workers created by the main thread,

lib/internal/worker.js

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ const {
5656
ReadableWorkerStdio,
5757
WritableWorkerStdio,
5858
} = workerIo;
59+
const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker/messaging');
5960
const { deserializeError } = require('internal/error_serdes');
6061
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
6162
const { kEmptyObject } = require('internal/util');
@@ -250,14 +251,18 @@ class Worker extends EventEmitter {
250251

251252
this[kParentSideStdio] = { stdin, stdout, stderr };
252253

253-
const { port1, port2 } = new MessageChannel();
254-
const transferList = [port2];
254+
const mainThreadPortToWorker = createMainThreadPort(this.threadId);
255+
const {
256+
port1: publicPortToParent,
257+
port2: publicPortToWorker,
258+
} = new MessageChannel();
259+
const transferList = [mainThreadPortToWorker, publicPortToWorker];
255260
// If transferList is provided.
256261
if (options.transferList)
257262
ArrayPrototypePush(transferList,
258263
...new SafeArrayIterator(options.transferList));
259264

260-
this[kPublicPort] = port1;
265+
this[kPublicPort] = publicPortToParent;
261266
ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
262267
this[kPublicPort].on(event, (message) => this.emit(event, message));
263268
});
@@ -271,8 +276,9 @@ class Worker extends EventEmitter {
271276
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
272277
workerData: options.workerData,
273278
environmentData,
274-
publicPort: port2,
275279
hasStdin: !!options.stdin,
280+
publicPort: publicPortToWorker,
281+
mainThreadPort: mainThreadPortToWorker,
276282
}, transferList);
277283
// Use this to cache the Worker's loopStart value once available.
278284
this[kLoopStartTime] = -1;
@@ -295,6 +301,7 @@ class Worker extends EventEmitter {
295301
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
296302
drainMessagePort(this[kPublicPort]);
297303
drainMessagePort(this[kPort]);
304+
destroyMainThreadPort(this.threadId);
298305
this.removeAllListeners('message');
299306
this.removeAllListeners('messageerrors');
300307
this[kPublicPort].unref();

0 commit comments

Comments
 (0)