@@ -24,20 +24,23 @@ const {
2424 stopMessagePort
2525} = internalBinding ( 'messaging' ) ;
2626const {
27- threadId,
2827 getEnvMessagePort
2928} = internalBinding ( 'worker' ) ;
3029
3130const { Readable, Writable } = require ( 'stream' ) ;
32- const EventEmitter = require ( 'events' ) ;
31+ const {
32+ Event,
33+ NodeEventTarget,
34+ defineEventHandler,
35+ initNodeEventTarget,
36+ kCreateEvent,
37+ kNewListener,
38+ kRemoveListener,
39+ } = require ( 'internal/event_target' ) ;
3340const { inspect } = require ( 'internal/util/inspect' ) ;
34- let debug = require ( 'internal/util/debuglog' ) . debuglog ( 'worker' , ( fn ) => {
35- debug = fn ;
36- } ) ;
3741
3842const kIncrementsPortRef = Symbol ( 'kIncrementsPortRef' ) ;
3943const kName = Symbol ( 'kName' ) ;
40- const kOnMessageListener = Symbol ( 'kOnMessageListener' ) ;
4144const kPort = Symbol ( 'kPort' ) ;
4245const kWaitingStreams = Symbol ( 'kWaitingStreams' ) ;
4346const kWritableCallbacks = Symbol ( 'kWritableCallbacks' ) ;
@@ -54,55 +57,47 @@ const messageTypes = {
5457} ;
5558
5659// We have to mess with the MessagePort prototype a bit, so that a) we can make
57- // it inherit from EventEmitter , even though it is a C++ class, and b) we do
60+ // it inherit from NodeEventTarget , even though it is a C++ class, and b) we do
5861// not provide methods that are not present in the Browser and not documented
5962// on our side (e.g. hasRef).
6063// Save a copy of the original set of methods as a shallow clone.
6164const MessagePortPrototype = ObjectCreate (
6265 ObjectGetPrototypeOf ( MessagePort . prototype ) ,
6366 ObjectGetOwnPropertyDescriptors ( MessagePort . prototype ) ) ;
6467// Set up the new inheritance chain.
65- ObjectSetPrototypeOf ( MessagePort , EventEmitter ) ;
66- ObjectSetPrototypeOf ( MessagePort . prototype , EventEmitter . prototype ) ;
68+ ObjectSetPrototypeOf ( MessagePort , NodeEventTarget ) ;
69+ ObjectSetPrototypeOf ( MessagePort . prototype , NodeEventTarget . prototype ) ;
6770// Copy methods that are inherited from HandleWrap, because
6871// changing the prototype of MessagePort.prototype implicitly removed them.
6972MessagePort . prototype . ref = MessagePortPrototype . ref ;
7073MessagePort . prototype . unref = MessagePortPrototype . unref ;
7174
72- // A communication channel consisting of a handle (that wraps around an
73- // uv_async_t) which can receive information from other threads and emits
74- // .onmessage events, and a function used for sending data to a MessagePort
75- // in some other thread.
76- MessagePort . prototype [ kOnMessageListener ] = function onmessage ( event ) {
77- if ( event . data && event . data . type !== messageTypes . STDIO_WANTS_MORE_DATA )
78- debug ( `[${ threadId } ] received message` , event ) ;
79- // Emit the deserialized object to userland.
80- this . emit ( 'message' , event . data ) ;
81- } ;
82-
83- // This is for compatibility with the Web's MessagePort API. It makes sense to
84- // provide it as an `EventEmitter` in Node.js, but if somebody overrides
85- // `onmessage`, we'll switch over to the Web API model.
86- ObjectDefineProperty ( MessagePort . prototype , 'onmessage' , {
87- enumerable : true ,
88- configurable : true ,
89- get ( ) {
90- return this [ kOnMessageListener ] ;
91- } ,
92- set ( value ) {
93- this [ kOnMessageListener ] = value ;
94- if ( typeof value === 'function' ) {
95- this . ref ( ) ;
96- MessagePortPrototype . start . call ( this ) ;
97- } else {
98- this . unref ( ) ;
99- stopMessagePort ( this ) ;
100- }
75+ class MessageEvent extends Event {
76+ constructor ( data , target , type ) {
77+ super ( type ) ;
78+ this . data = data ;
10179 }
102- } ) ;
80+ }
81+
82+ ObjectDefineProperty (
83+ MessagePort . prototype ,
84+ kCreateEvent ,
85+ {
86+ value : function ( data , type ) {
87+ return new MessageEvent ( data , this , type ) ;
88+ } ,
89+ configurable : false ,
90+ writable : false ,
91+ enumerable : false ,
92+ } ) ;
10393
10494// This is called from inside the `MessagePort` constructor.
10595function oninit ( ) {
96+ initNodeEventTarget ( this ) ;
97+ // TODO(addaleax): This should be on MessagePort.prototype, but
98+ // defineEventHandler() does not support that.
99+ defineEventHandler ( this , 'message' ) ;
100+ defineEventHandler ( this , 'messageerror' ) ;
106101 setupPortReferencing ( this , this , 'message' ) ;
107102}
108103
@@ -112,9 +107,15 @@ ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
112107 value : oninit
113108} ) ;
114109
110+ class MessagePortCloseEvent extends Event {
111+ constructor ( ) {
112+ super ( 'close' ) ;
113+ }
114+ }
115+
115116// This is called after the underlying `uv_async_t` has been closed.
116117function onclose ( ) {
117- this . emit ( 'close' ) ;
118+ this . dispatchEvent ( new MessagePortCloseEvent ( ) ) ;
118119}
119120
120121ObjectDefineProperty ( MessagePort . prototype , handleOnCloseSymbol , {
@@ -156,18 +157,36 @@ function setupPortReferencing(port, eventEmitter, eventName) {
156157 // If there are none or all are removed, unref() the channel so the worker
157158 // can shutdown gracefully.
158159 port . unref ( ) ;
159- eventEmitter . on ( 'newListener' , ( name ) => {
160- if ( name === eventName && eventEmitter . listenerCount ( eventName ) === 0 ) {
160+ eventEmitter . on ( 'newListener' , function ( name ) {
161+ if ( name === eventName ) newListener ( eventEmitter . listenerCount ( name ) ) ;
162+ } ) ;
163+ eventEmitter . on ( 'removeListener' , function ( name ) {
164+ if ( name === eventName ) removeListener ( eventEmitter . listenerCount ( name ) ) ;
165+ } ) ;
166+ const origNewListener = eventEmitter [ kNewListener ] ;
167+ eventEmitter [ kNewListener ] = function ( size , type , ...args ) {
168+ if ( type === eventName ) newListener ( size - 1 ) ;
169+ return origNewListener . call ( this , size , type , ...args ) ;
170+ } ;
171+ const origRemoveListener = eventEmitter [ kRemoveListener ] ;
172+ eventEmitter [ kRemoveListener ] = function ( size , type , ...args ) {
173+ if ( type === eventName ) removeListener ( size ) ;
174+ return origRemoveListener . call ( this , size , type , ...args ) ;
175+ } ;
176+
177+ function newListener ( size ) {
178+ if ( size === 0 ) {
161179 port . ref ( ) ;
162180 MessagePortPrototype . start . call ( port ) ;
163181 }
164- } ) ;
165- eventEmitter . on ( 'removeListener' , ( name ) => {
166- if ( name === eventName && eventEmitter . listenerCount ( eventName ) === 0 ) {
182+ }
183+
184+ function removeListener ( size ) {
185+ if ( size === 0 ) {
167186 stopMessagePort ( port ) ;
168187 port . unref ( ) ;
169188 }
170- } ) ;
189+ }
171190}
172191
173192
0 commit comments