Skip to content

Commit 9128df4

Browse files
author
Stephen Belanger
committed
lib: add tracing channel to diagnostics_channel
1 parent 22c645d commit 9128df4

8 files changed

+984
-21
lines changed

doc/api/diagnostics_channel.md

Lines changed: 472 additions & 0 deletions
Large diffs are not rendered by default.

lib/diagnostics_channel.js

Lines changed: 248 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@ const {
44
ArrayPrototypeIndexOf,
55
ArrayPrototypePush,
66
ArrayPrototypeSplice,
7+
FunctionPrototypeBind,
78
ObjectCreate,
89
ObjectGetPrototypeOf,
910
ObjectSetPrototypeOf,
11+
PromisePrototypeThen,
12+
PromiseReject,
13+
ReflectApply,
14+
SafeMap,
1015
SymbolHasInstance,
1116
} = primordials;
1217

@@ -23,11 +28,40 @@ const { triggerUncaughtException } = internalBinding('errors');
2328

2429
const { WeakReference } = internalBinding('util');
2530

31+
function decRef(channel) {
32+
channel._weak.decRef();
33+
if (channel._weak.getRef() === 0) {
34+
delete channels[channel.name];
35+
}
36+
}
37+
38+
function markActive(channel) {
39+
// eslint-disable-next-line no-use-before-define
40+
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
41+
channel._subscribers = [];
42+
channel._stores = new SafeMap();
43+
}
44+
45+
function maybeMarkInactive(channel) {
46+
// When there are no more active subscribers, restore to fast prototype.
47+
if (!channel._subscribers.length && !channel._stores.size) {
48+
// eslint-disable-next-line no-use-before-define
49+
ObjectSetPrototypeOf(channel, Channel.prototype);
50+
channel._subscribers = undefined;
51+
channel._stores = undefined;
52+
}
53+
}
54+
55+
function wrapStoreRun(store, data, next, transform = (v) => v) {
56+
return () => store.run(transform(data), next);
57+
}
58+
2659
// TODO(qard): should there be a C++ channel interface?
2760
class ActiveChannel {
2861
subscribe(subscription) {
2962
validateFunction(subscription, 'subscription');
3063
ArrayPrototypePush(this._subscribers, subscription);
64+
this._weak.incRef();
3165
}
3266

3367
unsubscribe(subscription) {
@@ -36,12 +70,28 @@ class ActiveChannel {
3670

3771
ArrayPrototypeSplice(this._subscribers, index, 1);
3872

39-
// When there are no more active subscribers, restore to fast prototype.
40-
if (!this._subscribers.length) {
41-
// eslint-disable-next-line no-use-before-define
42-
ObjectSetPrototypeOf(this, Channel.prototype);
73+
decRef(this);
74+
maybeMarkInactive(this);
75+
76+
return true;
77+
}
78+
79+
bindStore(store, transform) {
80+
const replacing = this._stores.has(store);
81+
if (!replacing) this._weak.incRef();
82+
this._stores.set(store, transform);
83+
}
84+
85+
unbindStore(store) {
86+
if (!this._stores.has(store)) {
87+
return false;
4388
}
4489

90+
this._stores.delete(store);
91+
92+
decRef(this);
93+
maybeMarkInactive(this);
94+
4595
return true;
4696
}
4797

@@ -61,11 +111,28 @@ class ActiveChannel {
61111
}
62112
}
63113
}
114+
115+
runStores(data, fn, thisArg, ...args) {
116+
this.publish(data);
117+
118+
// Bind base fn first due to AsyncLocalStorage.run not having thisArg
119+
fn = FunctionPrototypeBind(fn, thisArg, ...args);
120+
121+
for (const entry of this._stores.entries()) {
122+
const store = entry[0];
123+
const transform = entry[1];
124+
fn = wrapStoreRun(store, data, fn, transform);
125+
}
126+
127+
return fn();
128+
}
64129
}
65130

66131
class Channel {
67132
constructor(name) {
68133
this._subscribers = undefined;
134+
this._stores = undefined;
135+
this._weak = undefined;
69136
this.name = name;
70137
}
71138

@@ -76,20 +143,32 @@ class Channel {
76143
}
77144

78145
subscribe(subscription) {
79-
ObjectSetPrototypeOf(this, ActiveChannel.prototype);
80-
this._subscribers = [];
146+
markActive(this);
81147
this.subscribe(subscription);
82148
}
83149

84150
unsubscribe() {
85151
return false;
86152
}
87153

154+
bindStore(store, transform = (v) => v) {
155+
markActive(this);
156+
this.bindStore(store, transform);
157+
}
158+
159+
unbindStore() {
160+
return false;
161+
}
162+
88163
get hasSubscribers() {
89164
return false;
90165
}
91166

92167
publish() {}
168+
169+
runStores(data, fn, thisArg, ...args) {
170+
return ReflectApply(fn, thisArg, args);
171+
}
93172
}
94173

95174
const channels = ObjectCreate(null);
@@ -105,27 +184,17 @@ function channel(name) {
105184
}
106185

107186
channel = new Channel(name);
108-
channels[name] = new WeakReference(channel);
187+
channel._weak = new WeakReference(channel);
188+
channels[name] = channel._weak;
109189
return channel;
110190
}
111191

112192
function subscribe(name, subscription) {
113-
const chan = channel(name);
114-
channels[name].incRef();
115-
chan.subscribe(subscription);
193+
return channel(name).subscribe(subscription);
116194
}
117195

118196
function unsubscribe(name, subscription) {
119-
const chan = channel(name);
120-
if (!chan.unsubscribe(subscription)) {
121-
return false;
122-
}
123-
124-
channels[name].decRef();
125-
if (channels[name].getRef() === 0) {
126-
delete channels[name];
127-
}
128-
return true;
197+
return channel(name).unsubscribe(subscription);
129198
}
130199

131200
function hasSubscribers(name) {
@@ -139,10 +208,168 @@ function hasSubscribers(name) {
139208
return channel.hasSubscribers;
140209
}
141210

211+
const traceEvents = [
212+
'start',
213+
'end',
214+
'asyncStart',
215+
'asyncEnd',
216+
'error',
217+
];
218+
219+
function assertChannel(value, name) {
220+
if (!(value instanceof Channel)) {
221+
throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
222+
}
223+
}
224+
225+
class TracingChannel {
226+
constructor(nameOrChannels) {
227+
if (typeof nameOrChannels === 'string') {
228+
this.start = channel(`tracing:${nameOrChannels}:start`);
229+
this.end = channel(`tracing:${nameOrChannels}:end`);
230+
this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`);
231+
this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
232+
this.error = channel(`tracing:${nameOrChannels}:error`);
233+
} else if (typeof nameOrChannels === 'object') {
234+
const { start, end, asyncStart, asyncEnd, error } = nameOrChannels;
235+
236+
assertChannel(start, 'nameOrChannels.start');
237+
assertChannel(end, 'nameOrChannels.end');
238+
assertChannel(asyncStart, 'nameOrChannels.asyncStart');
239+
assertChannel(asyncEnd, 'nameOrChannels.asyncEnd');
240+
assertChannel(error, 'nameOrChannels.error');
241+
242+
this.start = start;
243+
this.end = end;
244+
this.asyncStart = asyncStart;
245+
this.asyncEnd = asyncEnd;
246+
this.error = error;
247+
} else {
248+
throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
249+
['string', 'object', 'Channel'],
250+
nameOrChannels);
251+
}
252+
}
253+
254+
subscribe(handlers) {
255+
for (const name of traceEvents) {
256+
if (!handlers[name]) continue;
257+
258+
this[name]?.subscribe(handlers[name]);
259+
}
260+
}
261+
262+
unsubscribe(handlers) {
263+
let done = true;
264+
265+
for (const name of traceEvents) {
266+
if (!handlers[name]) continue;
267+
268+
if (!this[name]?.unsubscribe(handlers[name])) {
269+
done = false;
270+
}
271+
}
272+
273+
return done;
274+
}
275+
276+
traceSync(fn, ctx = {}, thisArg, ...args) {
277+
const { start, end, error } = this;
278+
279+
try {
280+
const result = start.runStores(ctx, fn, thisArg, ...args);
281+
ctx.result = result;
282+
return result;
283+
} catch (err) {
284+
ctx.error = err;
285+
error.publish(ctx);
286+
throw err;
287+
} finally {
288+
end.publish(ctx);
289+
}
290+
}
291+
292+
tracePromise(fn, ctx = {}, thisArg, ...args) {
293+
const { start, end, asyncStart, asyncEnd, error } = this;
294+
295+
function reject(err) {
296+
ctx.error = err;
297+
error.publish(ctx);
298+
asyncStart.publish(ctx);
299+
// TODO: Is there a way to have asyncEnd _after_ the continuation?
300+
asyncEnd.publish(ctx);
301+
return PromiseReject(err);
302+
}
303+
304+
function resolve(result) {
305+
ctx.result = result;
306+
asyncStart.publish(ctx);
307+
// TODO: Is there a way to have asyncEnd _after_ the continuation?
308+
asyncEnd.publish(ctx);
309+
return result;
310+
}
311+
312+
try {
313+
const promise = start.runStores(ctx, fn, thisArg, ...args);
314+
return PromisePrototypeThen(promise, resolve, reject);
315+
} catch (err) {
316+
ctx.error = err;
317+
error.publish(ctx);
318+
throw err;
319+
} finally {
320+
end.publish(ctx);
321+
}
322+
}
323+
324+
traceCallback(fn, position = 0, ctx = {}, thisArg, ...args) {
325+
const { start, end, asyncStart, asyncEnd, error } = this;
326+
327+
function wrap(fn) {
328+
return function wrappedCallback(err, res) {
329+
if (err) {
330+
ctx.error = err;
331+
error.publish(ctx);
332+
} else {
333+
ctx.result = res;
334+
}
335+
336+
asyncStart.publish(ctx);
337+
try {
338+
if (fn) {
339+
return ReflectApply(fn, this, arguments);
340+
}
341+
} finally {
342+
asyncEnd.publish(ctx);
343+
}
344+
};
345+
}
346+
347+
if (position >= 0) {
348+
args.splice(position, 1, wrap(args.at(position)));
349+
}
350+
351+
try {
352+
return start.runStores(ctx, fn, thisArg, ...args);
353+
} catch (err) {
354+
ctx.error = err;
355+
error.publish(ctx);
356+
throw err;
357+
} finally {
358+
end.publish(ctx);
359+
}
360+
}
361+
}
362+
363+
function tracingChannel(nameOrChannels) {
364+
return new TracingChannel(nameOrChannels);
365+
}
366+
142367
module.exports = {
143368
channel,
144369
hasSubscribers,
145370
subscribe,
371+
tracingChannel,
146372
unsubscribe,
147-
Channel
373+
Channel,
374+
TracingChannel
148375
};

0 commit comments

Comments
 (0)