Skip to content

Commit ac655d8

Browse files
author
Stephen Belanger
committed
lib: add tracing channel to diagnostics_channel
1 parent 22c645d commit ac655d8

6 files changed

+481
-20
lines changed

lib/diagnostics_channel.js

Lines changed: 248 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ const {
44
ArrayPrototypeIndexOf,
55
ArrayPrototypePush,
66
ArrayPrototypeSplice,
7+
FunctionPrototypeBind,
78
ObjectCreate,
89
ObjectGetPrototypeOf,
910
ObjectSetPrototypeOf,
11+
PromisePrototypeThen,
12+
ReflectApply,
1013
SymbolHasInstance,
1114
} = primordials;
1215

@@ -23,11 +26,35 @@ const { triggerUncaughtException } = internalBinding('errors');
2326

2427
const { WeakReference } = internalBinding('util');
2528

29+
function decRef(channel) {
30+
channel._weak.decRef();
31+
if (channels[channel.name].getRef() === 0) {
32+
delete channels[channel.name];
33+
}
34+
}
35+
36+
function markActive(channel) {
37+
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
38+
channel._subscribers = [];
39+
channel._stores = new Map();
40+
}
41+
42+
function maybeMarkInactive(channel) {
43+
// When there are no more active subscribers, restore to fast prototype.
44+
if (!channel._subscribers.length && !channel._stores.size) {
45+
// eslint-disable-next-line no-use-before-define
46+
ObjectSetPrototypeOf(channel, Channel.prototype);
47+
channel._subscribers = undefined;
48+
channel._stores = undefined;
49+
}
50+
}
51+
2652
// TODO(qard): should there be a C++ channel interface?
2753
class ActiveChannel {
2854
subscribe(subscription) {
2955
validateFunction(subscription, 'subscription');
3056
ArrayPrototypePush(this._subscribers, subscription);
57+
this._weak.incRef();
3158
}
3259

3360
unsubscribe(subscription) {
@@ -36,11 +63,29 @@ class ActiveChannel {
3663

3764
ArrayPrototypeSplice(this._subscribers, index, 1);
3865

39-
// When there are no more active subscribers, restore to fast prototype.
40-
if (!this._subscribers.length) {
41-
// eslint-disable-next-line no-use-before-define
42-
ObjectSetPrototypeOf(this, Channel.prototype);
66+
decRef(this);
67+
maybeMarkInactive(this);
68+
69+
return true;
70+
}
71+
72+
bindStore(store, transform) {
73+
const replacing = this._stores.has(store);
74+
this._stores.set(store, transform);
75+
if (!replacing) {
76+
this._weak.incRef();
4377
}
78+
}
79+
80+
unbindStore(store) {
81+
if (!this._stores.has(store)) {
82+
return false;
83+
}
84+
85+
this._stores.delete(store);
86+
87+
decRef(this);
88+
maybeMarkInactive(this);
4489

4590
return true;
4691
}
@@ -61,11 +106,26 @@ class ActiveChannel {
61106
}
62107
}
63108
}
109+
110+
runStores(data, fn, thisArg, ...args) {
111+
this.publish(data);
112+
113+
// Bind base fn first due to AsyncLocalStorage.run not having thisArg
114+
fn = FunctionPrototypeBind(fn, thisArg, ...args);
115+
116+
for (const [ store, transform ] of this._stores.entries()) {
117+
fn = wrapStoreRun(store, data, fn, transform);
118+
}
119+
120+
return fn();
121+
}
64122
}
65123

66124
class Channel {
67125
constructor(name) {
68126
this._subscribers = undefined;
127+
this._stores = undefined;
128+
this._weak = undefined;
69129
this.name = name;
70130
}
71131

@@ -76,20 +136,36 @@ class Channel {
76136
}
77137

78138
subscribe(subscription) {
79-
ObjectSetPrototypeOf(this, ActiveChannel.prototype);
80-
this._subscribers = [];
139+
markActive(this);
81140
this.subscribe(subscription);
82141
}
83142

84143
unsubscribe() {
85144
return false;
86145
}
87146

147+
bindStore(store, transform = (v) => v) {
148+
markActive(this);
149+
this.bindStore(store, transform);
150+
}
151+
152+
unbindStore() {
153+
return false;
154+
}
155+
88156
get hasSubscribers() {
89157
return false;
90158
}
91159

92160
publish() {}
161+
162+
runStores(data, fn, thisArg, ...args) {
163+
return ReflectApply(fn, thisArg, args)
164+
}
165+
}
166+
167+
function wrapStoreRun(store, data, next, transform = (v) => v) {
168+
return () => store.run(transform(data), next);
93169
}
94170

95171
const channels = ObjectCreate(null);
@@ -105,27 +181,17 @@ function channel(name) {
105181
}
106182

107183
channel = new Channel(name);
108-
channels[name] = new WeakReference(channel);
184+
channel._weak = new WeakReference(channel);
185+
channels[name] = channel._weak;
109186
return channel;
110187
}
111188

112189
function subscribe(name, subscription) {
113-
const chan = channel(name);
114-
channels[name].incRef();
115-
chan.subscribe(subscription);
190+
return channel(name).subscribe(subscription);
116191
}
117192

118193
function unsubscribe(name, subscription) {
119-
const chan = channel(name);
120-
if (!chan.unsubscribe(subscription)) {
121-
return false;
122-
}
123-
124-
channels[name].decRef();
125-
if (channels[name].getRef() === 0) {
126-
delete channels[name];
127-
}
128-
return true;
194+
return channel(name).unsubscribe(subscription);
129195
}
130196

131197
function hasSubscribers(name) {
@@ -139,10 +205,172 @@ function hasSubscribers(name) {
139205
return channel.hasSubscribers;
140206
}
141207

208+
function traceSync(channels, fn, ctx, thisArg, ...args) {
209+
const { start, end, error } = channels;
210+
211+
try {
212+
const result = start.runStores(ctx, fn, thisArg, ...args);
213+
ctx.result = result;
214+
return result;
215+
} catch (err) {
216+
ctx.error = err;
217+
error.publish(ctx);
218+
throw err;
219+
} finally {
220+
end.publish(ctx);
221+
}
222+
}
223+
224+
function traceCallback(channels, fn, position, ctx, thisArg, ...args) {
225+
const { start, end, asyncEnd, error } = channels;
226+
227+
function wrap (fn) {
228+
return function wrappedCallback (err, res) {
229+
if (err) {
230+
ctx.error = err;
231+
error.publish(ctx);
232+
} else {
233+
ctx.result = res;
234+
}
235+
236+
asyncEnd.publish(ctx);
237+
if (fn) {
238+
return ReflectApply(fn, this, arguments);
239+
}
240+
}
241+
}
242+
243+
if (position >= 0) {
244+
args.splice(position, 1, wrap(args.at(position)));
245+
}
246+
247+
try {
248+
return start.runStores(ctx, fn, thisArg, ...args);
249+
} catch (err) {
250+
ctx.error = err;
251+
error.publish(ctx);
252+
throw err;
253+
} finally {
254+
end.publish(ctx);
255+
}
256+
}
257+
258+
function tracePromise(channels, fn, ctx, thisArg, ...args) {
259+
const { asyncEnd, start, end, error } = channels;
260+
261+
function reject(err) {
262+
ctx.error = err;
263+
error.publish(ctx);
264+
asyncEnd.publish(ctx);
265+
return Promise.reject(err);
266+
}
267+
268+
function resolve(result) {
269+
ctx.result = result;
270+
asyncEnd.publish(ctx);
271+
return result;
272+
}
273+
274+
try {
275+
const promise = start.runStores(ctx, fn, thisArg, ...args);
276+
return PromisePrototypeThen(promise, resolve, reject);
277+
} catch (err) {
278+
ctx.error = err;
279+
error.publish(ctx);
280+
throw err;
281+
} finally {
282+
end.publish(ctx);
283+
}
284+
}
285+
286+
class TracingChannel {
287+
constructor(name) {
288+
this.name = name;
289+
this.channels = {
290+
start: new Channel(`tracing:${name}:start`),
291+
end: new Channel(`tracing:${name}:end`),
292+
asyncEnd: new Channel(`tracing:${name}:asyncEnd`),
293+
error: new Channel(`tracing:${name}:error`)
294+
};
295+
}
296+
297+
// Attach WeakReference to all the sub-channels so the liveness management
298+
// in subscribe/unsubscribe keeps the TracingChannel the sub-channels are
299+
// attached to alive.
300+
set _weak(weak) {
301+
for (const key in this.channels) {
302+
this.channels[key]._weak = weak;
303+
}
304+
}
305+
306+
get hasSubscribers() {
307+
for (const key in this.channels) {
308+
if (this.channels[key].hasSubscribers) {
309+
return true;
310+
}
311+
}
312+
return false;
313+
}
314+
315+
subscribe(handlers) {
316+
for (const key in handlers) {
317+
this.channels[key]?.subscribe(handlers[key]);
318+
}
319+
}
320+
321+
unsubscribe(handlers) {
322+
let done = true;
323+
for (const key in handlers) {
324+
const channel = this.channels[key];
325+
if (channel instanceof Channel && !channel.unsubscribe(handlers[key])) {
326+
done = false;
327+
}
328+
}
329+
return done;
330+
}
331+
332+
traceSync(fn, ctx = {}, thisArg, ...args) {
333+
if (!this.hasSubscribers) return ReflectApply(fn, thisArg, args);
334+
return traceSync(this.channels, fn, ctx, thisArg, ...args);
335+
}
336+
337+
tracePromise(fn, ctx = {}, thisArg, ...args) {
338+
if (!this.hasSubscribers) return ReflectApply(fn, thisArg, args);
339+
return tracePromise(this.channels, fn, ctx, thisArg, ...args);
340+
}
341+
342+
traceCallback(fn, position = 0, ctx = {}, thisArg, ...args) {
343+
if (!this.hasSubscribers) return ReflectApply(fn, thisArg, args);
344+
return traceCallback(this.channels, fn, position, ctx, thisArg, ...args);
345+
}
346+
}
347+
348+
const tracingChannels = ObjectCreate(null);
349+
350+
function tracingChannel(name) {
351+
let channel;
352+
const ref = tracingChannels[name];
353+
if (ref) channel = ref.get();
354+
if (channel) return channel;
355+
356+
if (typeof name !== 'string' && typeof name !== 'symbol') {
357+
throw new ERR_INVALID_ARG_TYPE('tracingChannel', ['string', 'symbol'], name);
358+
}
359+
360+
channel = new TracingChannel(name);
361+
channel._weak = new WeakReference(channel);
362+
tracingChannels[name] = channel._weak;
363+
return channel;
364+
}
365+
142366
module.exports = {
143367
channel,
144368
hasSubscribers,
145369
subscribe,
370+
tracingChannel,
371+
traceSync,
372+
traceCallback,
373+
tracePromise,
146374
unsubscribe,
147375
Channel
148376
};

0 commit comments

Comments
 (0)