11'use strict' ;
22
33const {
4+ ArrayPrototypeAt,
45 ArrayPrototypeIndexOf,
56 ArrayPrototypePush,
67 ArrayPrototypeSplice,
8+ FunctionPrototypeBind,
79 ObjectCreate,
810 ObjectGetPrototypeOf,
911 ObjectSetPrototypeOf,
12+ PromisePrototypeThen,
13+ PromiseReject,
14+ ReflectApply,
15+ SafeMap,
1016 SymbolHasInstance,
1117} = primordials ;
1218
@@ -23,11 +29,44 @@ const { triggerUncaughtException } = internalBinding('errors');
2329
2430const { WeakReference } = internalBinding ( 'util' ) ;
2531
32+ function decRef ( channel ) {
33+ channel . _weak . decRef ( ) ;
34+ if ( channel . _weak . getRef ( ) === 0 ) {
35+ delete channels [ channel . name ] ;
36+ }
37+ }
38+
39+ function markActive ( channel ) {
40+ // eslint-disable-next-line no-use-before-define
41+ ObjectSetPrototypeOf ( channel , ActiveChannel . prototype ) ;
42+ channel . _subscribers = [ ] ;
43+ channel . _stores = new SafeMap ( ) ;
44+ }
45+
46+ function maybeMarkInactive ( channel ) {
47+ // When there are no more active subscribers, restore to fast prototype.
48+ if ( ! channel . _subscribers . length && ! channel . _stores . size ) {
49+ // eslint-disable-next-line no-use-before-define
50+ ObjectSetPrototypeOf ( channel , Channel . prototype ) ;
51+ channel . _subscribers = undefined ;
52+ channel . _stores = undefined ;
53+ }
54+ }
55+
56+ function defaultTransform ( data ) {
57+ return data
58+ }
59+
60+ function wrapStoreRun ( store , data , next , transform = defaultTransform ) {
61+ return ( ) => store . run ( transform ( data ) , next ) ;
62+ }
63+
2664// TODO(qard): should there be a C++ channel interface?
2765class ActiveChannel {
2866 subscribe ( subscription ) {
2967 validateFunction ( subscription , 'subscription' ) ;
3068 ArrayPrototypePush ( this . _subscribers , subscription ) ;
69+ this . _weak . incRef ( ) ;
3170 }
3271
3372 unsubscribe ( subscription ) {
@@ -36,12 +75,28 @@ class ActiveChannel {
3675
3776 ArrayPrototypeSplice ( this . _subscribers , index , 1 ) ;
3877
39- // When there are no more active subscribers, restore to fast prototype.
40- if ( ! this . _subscribers . length ) {
41- // eslint-disable-next-line no-use-before-define
42- ObjectSetPrototypeOf ( this , Channel . prototype ) ;
78+ decRef ( this ) ;
79+ maybeMarkInactive ( this ) ;
80+
81+ return true ;
82+ }
83+
84+ bindStore ( store , transform ) {
85+ const replacing = this . _stores . has ( store ) ;
86+ if ( ! replacing ) this . _weak . incRef ( ) ;
87+ this . _stores . set ( store , transform ) ;
88+ }
89+
90+ unbindStore ( store ) {
91+ if ( ! this . _stores . has ( store ) ) {
92+ return false ;
4393 }
4494
95+ this . _stores . delete ( store ) ;
96+
97+ decRef ( this ) ;
98+ maybeMarkInactive ( this ) ;
99+
45100 return true ;
46101 }
47102
@@ -61,11 +116,28 @@ class ActiveChannel {
61116 }
62117 }
63118 }
119+
120+ runStores ( data , fn , thisArg , ...args ) {
121+ this . publish ( data ) ;
122+
123+ // Bind base fn first due to AsyncLocalStorage.run not having thisArg
124+ fn = FunctionPrototypeBind ( fn , thisArg , ...args ) ;
125+
126+ for ( const entry of this . _stores . entries ( ) ) {
127+ const store = entry [ 0 ] ;
128+ const transform = entry [ 1 ] ;
129+ fn = wrapStoreRun ( store , data , fn , transform ) ;
130+ }
131+
132+ return fn ( ) ;
133+ }
64134}
65135
66136class Channel {
67137 constructor ( name ) {
68138 this . _subscribers = undefined ;
139+ this . _stores = undefined ;
140+ this . _weak = undefined ;
69141 this . name = name ;
70142 }
71143
@@ -76,20 +148,32 @@ class Channel {
76148 }
77149
78150 subscribe ( subscription ) {
79- ObjectSetPrototypeOf ( this , ActiveChannel . prototype ) ;
80- this . _subscribers = [ ] ;
151+ markActive ( this ) ;
81152 this . subscribe ( subscription ) ;
82153 }
83154
84155 unsubscribe ( ) {
85156 return false ;
86157 }
87158
159+ bindStore ( store , transform ) {
160+ markActive ( this ) ;
161+ this . bindStore ( store , transform ) ;
162+ }
163+
164+ unbindStore ( ) {
165+ return false ;
166+ }
167+
88168 get hasSubscribers ( ) {
89169 return false ;
90170 }
91171
92172 publish ( ) { }
173+
174+ runStores ( data , fn , thisArg , ...args ) {
175+ return ReflectApply ( fn , thisArg , args ) ;
176+ }
93177}
94178
95179const channels = ObjectCreate ( null ) ;
@@ -105,27 +189,17 @@ function channel(name) {
105189 }
106190
107191 channel = new Channel ( name ) ;
108- channels [ name ] = new WeakReference ( channel ) ;
192+ channel . _weak = new WeakReference ( channel ) ;
193+ channels [ name ] = channel . _weak ;
109194 return channel ;
110195}
111196
112197function subscribe ( name , subscription ) {
113- const chan = channel ( name ) ;
114- channels [ name ] . incRef ( ) ;
115- chan . subscribe ( subscription ) ;
198+ return channel ( name ) . subscribe ( subscription ) ;
116199}
117200
118201function unsubscribe ( name , subscription ) {
119- const chan = channel ( name ) ;
120- if ( ! chan . unsubscribe ( subscription ) ) {
121- return false ;
122- }
123-
124- channels [ name ] . decRef ( ) ;
125- if ( channels [ name ] . getRef ( ) === 0 ) {
126- delete channels [ name ] ;
127- }
128- return true ;
202+ return channel ( name ) . unsubscribe ( subscription ) ;
129203}
130204
131205function hasSubscribers ( name ) {
@@ -139,10 +213,165 @@ function hasSubscribers(name) {
139213 return channel . hasSubscribers ;
140214}
141215
216+ const traceEvents = [
217+ 'start' ,
218+ 'end' ,
219+ 'asyncStart' ,
220+ 'asyncEnd' ,
221+ 'error' ,
222+ ] ;
223+
224+ function assertChannel ( value , name ) {
225+ if ( ! ( value instanceof Channel ) ) {
226+ throw new ERR_INVALID_ARG_TYPE ( name , [ 'Channel' ] , value ) ;
227+ }
228+ }
229+
230+ class TracingChannel {
231+ constructor ( nameOrChannels ) {
232+ if ( typeof nameOrChannels === 'string' ) {
233+ this . start = channel ( `tracing:${ nameOrChannels } :start` ) ;
234+ this . end = channel ( `tracing:${ nameOrChannels } :end` ) ;
235+ this . asyncStart = channel ( `tracing:${ nameOrChannels } :asyncStart` ) ;
236+ this . asyncEnd = channel ( `tracing:${ nameOrChannels } :asyncEnd` ) ;
237+ this . error = channel ( `tracing:${ nameOrChannels } :error` ) ;
238+ } else if ( typeof nameOrChannels === 'object' ) {
239+ const { start, end, asyncStart, asyncEnd, error } = nameOrChannels ;
240+
241+ assertChannel ( start , 'nameOrChannels.start' ) ;
242+ assertChannel ( end , 'nameOrChannels.end' ) ;
243+ assertChannel ( asyncStart , 'nameOrChannels.asyncStart' ) ;
244+ assertChannel ( asyncEnd , 'nameOrChannels.asyncEnd' ) ;
245+ assertChannel ( error , 'nameOrChannels.error' ) ;
246+
247+ this . start = start ;
248+ this . end = end ;
249+ this . asyncStart = asyncStart ;
250+ this . asyncEnd = asyncEnd ;
251+ this . error = error ;
252+ } else {
253+ throw new ERR_INVALID_ARG_TYPE ( 'nameOrChannels' ,
254+ [ 'string' , 'object' , 'Channel' ] ,
255+ nameOrChannels ) ;
256+ }
257+ }
258+
259+ subscribe ( handlers ) {
260+ for ( const name of traceEvents ) {
261+ if ( ! handlers [ name ] ) continue ;
262+
263+ this [ name ] ?. subscribe ( handlers [ name ] ) ;
264+ }
265+ }
266+
267+ unsubscribe ( handlers ) {
268+ let done = true ;
269+
270+ for ( const name of traceEvents ) {
271+ if ( ! handlers [ name ] ) continue ;
272+
273+ if ( ! this [ name ] ?. unsubscribe ( handlers [ name ] ) ) {
274+ done = false ;
275+ }
276+ }
277+
278+ return done ;
279+ }
280+
281+ traceSync ( fn , ctx = { } , thisArg , ...args ) {
282+ const { start, end, error } = this ;
283+
284+ try {
285+ const result = start . runStores ( ctx , fn , thisArg , ...args ) ;
286+ ctx . result = result ;
287+ return result ;
288+ } catch ( err ) {
289+ ctx . error = err ;
290+ error . publish ( ctx ) ;
291+ throw err ;
292+ } finally {
293+ end . publish ( ctx ) ;
294+ }
295+ }
296+
297+ tracePromise ( fn , ctx = { } , thisArg , ...args ) {
298+ const { start, end, asyncStart, asyncEnd, error } = this ;
299+
300+ function reject ( err ) {
301+ ctx . error = err ;
302+ error . publish ( ctx ) ;
303+ asyncStart . publish ( ctx ) ;
304+ // TODO: Is there a way to have asyncEnd _after_ the continuation?
305+ asyncEnd . publish ( ctx ) ;
306+ return PromiseReject ( err ) ;
307+ }
308+
309+ function resolve ( result ) {
310+ ctx . result = result ;
311+ asyncStart . publish ( ctx ) ;
312+ // TODO: Is there a way to have asyncEnd _after_ the continuation?
313+ asyncEnd . publish ( ctx ) ;
314+ return result ;
315+ }
316+
317+ try {
318+ const promise = start . runStores ( ctx , fn , thisArg , ...args ) ;
319+ return PromisePrototypeThen ( promise , resolve , reject ) ;
320+ } catch ( err ) {
321+ ctx . error = err ;
322+ error . publish ( ctx ) ;
323+ throw err ;
324+ } finally {
325+ end . publish ( ctx ) ;
326+ }
327+ }
328+
329+ traceCallback ( fn , position = 0 , ctx = { } , thisArg , ...args ) {
330+ const { start, end, asyncStart, asyncEnd, error } = this ;
331+
332+ function wrappedCallback ( err , res ) {
333+ if ( err ) {
334+ ctx . error = err ;
335+ error . publish ( ctx ) ;
336+ } else {
337+ ctx . result = res ;
338+ }
339+
340+ asyncStart . publish ( ctx ) ;
341+ try {
342+ if ( callback ) {
343+ return ReflectApply ( callback , this , arguments ) ;
344+ }
345+ } finally {
346+ asyncEnd . publish ( ctx ) ;
347+ }
348+ }
349+
350+ const callback = ArrayPrototypeAt ( args , position ) ;
351+ ArrayPrototypeSplice ( args , position , 1 , wrappedCallback ) ;
352+
353+ try {
354+ return start . runStores ( ctx , fn , thisArg , ...args ) ;
355+ } catch ( err ) {
356+ ctx . error = err ;
357+ error . publish ( ctx ) ;
358+ throw err ;
359+ } finally {
360+ end . publish ( ctx ) ;
361+ }
362+ }
363+ }
364+
365+ function tracingChannel ( nameOrChannels ) {
366+ return new TracingChannel ( nameOrChannels ) ;
367+ }
368+
142369module . exports = {
143370 channel,
144371 hasSubscribers,
145372 subscribe,
373+ tracingChannel,
146374 unsubscribe,
147- Channel
375+ Channel,
376+ TracingChannel
148377} ;
0 commit comments