88 ObjectGetPrototypeOf,
99 ObjectSetPrototypeOf,
1010 Symbol,
11+ SymbolFor,
1112} = primordials ;
1213
1314const {
@@ -24,20 +25,23 @@ const {
2425 stopMessagePort
2526} = internalBinding ( 'messaging' ) ;
2627const {
27- threadId,
2828 getEnvMessagePort
2929} = internalBinding ( 'worker' ) ;
3030
3131const { Readable, Writable } = require ( 'stream' ) ;
32- const EventEmitter = require ( 'events' ) ;
32+ const {
33+ Event,
34+ NodeEventTarget,
35+ defineEventHandler,
36+ initNodeEventTarget,
37+ kNewListener,
38+ kRemoveListener,
39+ kPreprocessEvent,
40+ } = require ( 'internal/event_target' ) ;
3341const { inspect } = require ( 'internal/util/inspect' ) ;
34- let debug = require ( 'internal/util/debuglog' ) . debuglog ( 'worker' , ( fn ) => {
35- debug = fn ;
36- } ) ;
3742
3843const kIncrementsPortRef = Symbol ( 'kIncrementsPortRef' ) ;
3944const kName = Symbol ( 'kName' ) ;
40- const kOnMessageListener = Symbol ( 'kOnMessageListener' ) ;
4145const kPort = Symbol ( 'kPort' ) ;
4246const kWaitingStreams = Symbol ( 'kWaitingStreams' ) ;
4347const kWritableCallbacks = Symbol ( 'kWritableCallbacks' ) ;
@@ -54,55 +58,44 @@ const messageTypes = {
5458} ;
5559
5660// We have to mess with the MessagePort prototype a bit, so that a) we can make
57- // it inherit from EventEmitter , even though it is a C++ class, and b) we do
61+ // it inherit from NodeEventTarget , even though it is a C++ class, and b) we do
5862// not provide methods that are not present in the Browser and not documented
5963// on our side (e.g. hasRef).
6064// Save a copy of the original set of methods as a shallow clone.
6165const MessagePortPrototype = ObjectCreate (
6266 ObjectGetPrototypeOf ( MessagePort . prototype ) ,
6367 ObjectGetOwnPropertyDescriptors ( MessagePort . prototype ) ) ;
6468// Set up the new inheritance chain.
65- ObjectSetPrototypeOf ( MessagePort , EventEmitter ) ;
66- ObjectSetPrototypeOf ( MessagePort . prototype , EventEmitter . prototype ) ;
69+ ObjectSetPrototypeOf ( MessagePort , NodeEventTarget ) ;
70+ ObjectSetPrototypeOf ( MessagePort . prototype , NodeEventTarget . prototype ) ;
6771// Copy methods that are inherited from HandleWrap, because
6872// changing the prototype of MessagePort.prototype implicitly removed them.
6973MessagePort . prototype . ref = MessagePortPrototype . ref ;
7074MessagePort . prototype . unref = MessagePortPrototype . unref ;
7175
72- // A communication channel consisting of a handle (that wraps around an
73- // uv_async_t) which can receive information from other threads and emits
74- // .onmessage events, and a function used for sending data to a MessagePort
75- // in some other thread.
76- MessagePort . prototype [ kOnMessageListener ] = function onmessage ( event ) {
77- if ( event . data && event . data . type !== messageTypes . STDIO_WANTS_MORE_DATA )
78- debug ( `[${ threadId } ] received message` , event ) ;
79- // Emit the deserialized object to userland.
80- this . emit ( 'message' , event . data ) ;
81- } ;
82-
83- // This is for compatibility with the Web's MessagePort API. It makes sense to
84- // provide it as an `EventEmitter` in Node.js, but if somebody overrides
85- // `onmessage`, we'll switch over to the Web API model.
86- ObjectDefineProperty ( MessagePort . prototype , 'onmessage' , {
87- enumerable : true ,
88- configurable : true ,
89- get ( ) {
90- return this [ kOnMessageListener ] ;
91- } ,
92- set ( value ) {
93- this [ kOnMessageListener ] = value ;
94- if ( typeof value === 'function' ) {
95- this . ref ( ) ;
96- MessagePortPrototype . start . call ( this ) ;
97- } else {
98- this . unref ( ) ;
99- stopMessagePort ( this ) ;
100- }
76+ class MessageEvent extends Event {
77+ constructor ( data , target , type ) {
78+ super ( type ) ;
79+ this . data = data ;
10180 }
102- } ) ;
81+ }
82+
83+ ObjectDefineProperty (
84+ MessagePort . prototype ,
85+ SymbolFor ( 'nodejs.internal.MessageEvent' ) ,
86+ {
87+ value : MessageEvent ,
88+ configurable : true ,
89+ writable : false ,
90+ enumerable : false ,
91+ } ) ;
10392
10493// This is called from inside the `MessagePort` constructor.
10594function oninit ( ) {
95+ initNodeEventTarget ( this ) ;
96+ // TODO(addaleax): This should be on MessagePort.prototype, but
97+ // defineEventHandler() does not support that.
98+ defineEventHandler ( this , 'message' ) ;
10699 setupPortReferencing ( this , this , 'message' ) ;
107100}
108101
@@ -112,9 +105,21 @@ ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
112105 value : oninit
113106} ) ;
114107
108+ MessagePort . prototype [ kPreprocessEvent ] = function ( event ) {
109+ if ( event . type === 'message' || event . type === 'messageerror' )
110+ return event . data ;
111+ return event ;
112+ } ;
113+
114+ class MessagePortCloseEvent extends Event {
115+ constructor ( ) {
116+ super ( 'close' ) ;
117+ }
118+ }
119+
115120// This is called after the underlying `uv_async_t` has been closed.
116121function onclose ( ) {
117- this . emit ( 'close' ) ;
122+ this . dispatchEvent ( new MessagePortCloseEvent ( ) ) ;
118123}
119124
120125ObjectDefineProperty ( MessagePort . prototype , handleOnCloseSymbol , {
@@ -156,18 +161,36 @@ function setupPortReferencing(port, eventEmitter, eventName) {
156161 // If there are none or all are removed, unref() the channel so the worker
157162 // can shutdown gracefully.
158163 port . unref ( ) ;
159- eventEmitter . on ( 'newListener' , ( name ) => {
160- if ( name === eventName && eventEmitter . listenerCount ( eventName ) === 0 ) {
164+ eventEmitter . on ( 'newListener' , function ( name ) {
165+ if ( name === eventName ) newListener ( eventEmitter . listenerCount ( name ) ) ;
166+ } ) ;
167+ eventEmitter . on ( 'removeListener' , function ( name ) {
168+ if ( name === eventName ) removeListener ( eventEmitter . listenerCount ( name ) ) ;
169+ } ) ;
170+ const origNewListener = eventEmitter [ kNewListener ] ;
171+ eventEmitter [ kNewListener ] = function ( size , type , ...args ) {
172+ if ( type === eventName ) newListener ( size - 1 ) ;
173+ return origNewListener . call ( this , size , type , ...args ) ;
174+ } ;
175+ const origRemoveListener = eventEmitter [ kRemoveListener ] ;
176+ eventEmitter [ kRemoveListener ] = function ( size , type , ...args ) {
177+ if ( type === eventName ) removeListener ( size ) ;
178+ return origRemoveListener . call ( this , size , type , ...args ) ;
179+ } ;
180+
181+ function newListener ( size ) {
182+ if ( size === 0 ) {
161183 port . ref ( ) ;
162184 MessagePortPrototype . start . call ( port ) ;
163185 }
164- } ) ;
165- eventEmitter . on ( 'removeListener' , ( name ) => {
166- if ( name === eventName && eventEmitter . listenerCount ( eventName ) === 0 ) {
186+ }
187+
188+ function removeListener ( size ) {
189+ if ( size === 0 ) {
167190 stopMessagePort ( port ) ;
168191 port . unref ( ) ;
169192 }
170- } ) ;
193+ }
171194}
172195
173196
0 commit comments