Skip to content

Commit 8765e7d

Browse files
author
Stephen Belanger
committed
lib: add tracing channel to diagnostics_channel
1 parent 22c645d commit 8765e7d

8 files changed

+934
-21
lines changed

doc/api/diagnostics_channel.md

Lines changed: 448 additions & 0 deletions
Large diffs are not rendered by default.

lib/diagnostics_channel.js

Lines changed: 230 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ const {
44
ArrayPrototypeIndexOf,
55
ArrayPrototypePush,
66
ArrayPrototypeSplice,
7+
FunctionPrototypeBind,
78
ObjectCreate,
89
ObjectGetPrototypeOf,
910
ObjectSetPrototypeOf,
11+
PromisePrototypeThen,
12+
ReflectApply,
1013
SymbolHasInstance,
1114
} = primordials;
1215

@@ -23,11 +26,39 @@ const { triggerUncaughtException } = internalBinding('errors');
2326

2427
const { WeakReference } = internalBinding('util');
2528

29+
function decRef(channel) {
30+
channel._weak.decRef();
31+
if (channel._weak.getRef() === 0) {
32+
delete channels[channel.name];
33+
}
34+
}
35+
36+
function markActive(channel) {
37+
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
38+
channel._subscribers = [];
39+
channel._stores = new Map();
40+
}
41+
42+
function maybeMarkInactive(channel) {
43+
// When there are no more active subscribers, restore to fast prototype.
44+
if (!channel._subscribers.length && !channel._stores.size) {
45+
// eslint-disable-next-line no-use-before-define
46+
ObjectSetPrototypeOf(channel, Channel.prototype);
47+
channel._subscribers = undefined;
48+
channel._stores = undefined;
49+
}
50+
}
51+
52+
function wrapStoreRun(store, data, next, transform = (v) => v) {
53+
return () => store.run(transform(data), next);
54+
}
55+
2656
// TODO(qard): should there be a C++ channel interface?
2757
class ActiveChannel {
2858
subscribe(subscription) {
2959
validateFunction(subscription, 'subscription');
3060
ArrayPrototypePush(this._subscribers, subscription);
61+
this._weak.incRef();
3162
}
3263

3364
unsubscribe(subscription) {
@@ -36,12 +67,28 @@ class ActiveChannel {
3667

3768
ArrayPrototypeSplice(this._subscribers, index, 1);
3869

39-
// When there are no more active subscribers, restore to fast prototype.
40-
if (!this._subscribers.length) {
41-
// eslint-disable-next-line no-use-before-define
42-
ObjectSetPrototypeOf(this, Channel.prototype);
70+
decRef(this);
71+
maybeMarkInactive(this);
72+
73+
return true;
74+
}
75+
76+
bindStore(store, transform) {
77+
const replacing = this._stores.has(store);
78+
if (!replacing) this._weak.incRef();
79+
this._stores.set(store, transform);
80+
}
81+
82+
unbindStore(store) {
83+
if (!this._stores.has(store)) {
84+
return false;
4385
}
4486

87+
this._stores.delete(store);
88+
89+
decRef(this);
90+
maybeMarkInactive(this);
91+
4592
return true;
4693
}
4794

@@ -61,11 +108,26 @@ class ActiveChannel {
61108
}
62109
}
63110
}
111+
112+
runStores(data, fn, thisArg, ...args) {
113+
this.publish(data);
114+
115+
// Bind base fn first due to AsyncLocalStorage.run not having thisArg
116+
fn = FunctionPrototypeBind(fn, thisArg, ...args);
117+
118+
for (const [ store, transform ] of this._stores.entries()) {
119+
fn = wrapStoreRun(store, data, fn, transform);
120+
}
121+
122+
return fn();
123+
}
64124
}
65125

66126
class Channel {
67127
constructor(name) {
68128
this._subscribers = undefined;
129+
this._stores = undefined;
130+
this._weak = undefined;
69131
this.name = name;
70132
}
71133

@@ -76,20 +138,32 @@ class Channel {
76138
}
77139

78140
subscribe(subscription) {
79-
ObjectSetPrototypeOf(this, ActiveChannel.prototype);
80-
this._subscribers = [];
141+
markActive(this);
81142
this.subscribe(subscription);
82143
}
83144

84145
unsubscribe() {
85146
return false;
86147
}
87148

149+
bindStore(store, transform = (v) => v) {
150+
markActive(this);
151+
this.bindStore(store, transform);
152+
}
153+
154+
unbindStore() {
155+
return false;
156+
}
157+
88158
get hasSubscribers() {
89159
return false;
90160
}
91161

92162
publish() {}
163+
164+
runStores(data, fn, thisArg, ...args) {
165+
return ReflectApply(fn, thisArg, args)
166+
}
93167
}
94168

95169
const channels = ObjectCreate(null);
@@ -105,27 +179,17 @@ function channel(name) {
105179
}
106180

107181
channel = new Channel(name);
108-
channels[name] = new WeakReference(channel);
182+
channel._weak = new WeakReference(channel);
183+
channels[name] = channel._weak;
109184
return channel;
110185
}
111186

112187
function subscribe(name, subscription) {
113-
const chan = channel(name);
114-
channels[name].incRef();
115-
chan.subscribe(subscription);
188+
return channel(name).subscribe(subscription);
116189
}
117190

118191
function unsubscribe(name, subscription) {
119-
const chan = channel(name);
120-
if (!chan.unsubscribe(subscription)) {
121-
return false;
122-
}
123-
124-
channels[name].decRef();
125-
if (channels[name].getRef() === 0) {
126-
delete channels[name];
127-
}
128-
return true;
192+
return channel(name).unsubscribe(subscription);
129193
}
130194

131195
function hasSubscribers(name) {
@@ -139,10 +203,155 @@ function hasSubscribers(name) {
139203
return channel.hasSubscribers;
140204
}
141205

206+
const traceEvents = [
207+
'start',
208+
'end',
209+
'asyncEnd',
210+
'error'
211+
];
212+
213+
function assertChannel(value, name) {
214+
if (!(value instanceof Channel)) {
215+
throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
216+
}
217+
}
218+
219+
class TracingChannel {
220+
constructor(nameOrChannels) {
221+
if (typeof nameOrChannels === 'string') {
222+
this.start = channel(`tracing:${nameOrChannels}:start`);
223+
this.end = channel(`tracing:${nameOrChannels}:end`);
224+
this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
225+
this.error = channel(`tracing:${nameOrChannels}:error`);
226+
} else if (typeof nameOrChannels === 'object') {
227+
const { start, end, asyncEnd, error } = nameOrChannels;
228+
229+
assertChannel(start, 'nameOrChannels.start');
230+
assertChannel(end, 'nameOrChannels.end');
231+
assertChannel(asyncEnd, 'nameOrChannels.asyncEnd');
232+
assertChannel(error, 'nameOrChannels.error');
233+
234+
this.start = start;
235+
this.end = end;
236+
this.asyncEnd = asyncEnd;
237+
this.error = error;
238+
} else {
239+
throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
240+
['string', 'object', 'Channel'], nameOrChannels);
241+
}
242+
}
243+
244+
subscribe(handlers) {
245+
for (const name of traceEvents) {
246+
if (!handlers[name]) continue;
247+
248+
this[name]?.subscribe(handlers[name]);
249+
}
250+
}
251+
252+
unsubscribe(handlers) {
253+
let done = true;
254+
255+
for (const name of traceEvents) {
256+
if (!handlers[name]) continue;
257+
258+
if (!this[name]?.unsubscribe(handlers[name])) {
259+
done = false;
260+
}
261+
}
262+
263+
return done;
264+
}
265+
266+
traceSync(fn, ctx = {}, thisArg, ...args) {
267+
const { start, end, error } = this;
268+
269+
try {
270+
const result = start.runStores(ctx, fn, thisArg, ...args);
271+
ctx.result = result;
272+
return result;
273+
} catch (err) {
274+
ctx.error = err;
275+
error.publish(ctx);
276+
throw err;
277+
} finally {
278+
end.publish(ctx);
279+
}
280+
}
281+
282+
tracePromise(fn, ctx = {}, thisArg, ...args) {
283+
const { asyncEnd, start, end, error } = this;
284+
285+
function reject (err) {
286+
ctx.error = err;
287+
error.publish(ctx);
288+
asyncEnd.publish(ctx);
289+
return Promise.reject(err);
290+
}
291+
292+
function resolve (result) {
293+
ctx.result = result;
294+
asyncEnd.publish(ctx);
295+
return result;
296+
}
297+
298+
try {
299+
const promise = start.runStores(ctx, fn, thisArg, ...args);
300+
return PromisePrototypeThen(promise, resolve, reject);
301+
} catch (err) {
302+
ctx.error = err;
303+
error.publish(ctx);
304+
throw err;
305+
} finally {
306+
end.publish(ctx);
307+
}
308+
}
309+
310+
traceCallback(fn, position = 0, ctx = {}, thisArg, ...args) {
311+
const { start, end, asyncEnd, error } = this;
312+
313+
function wrap (fn) {
314+
return function wrappedCallback (err, res) {
315+
if (err) {
316+
ctx.error = err;
317+
error.publish(ctx);
318+
} else {
319+
ctx.result = res;
320+
}
321+
322+
asyncEnd.publish(ctx);
323+
if (fn) {
324+
return ReflectApply(fn, this, arguments);
325+
}
326+
}
327+
}
328+
329+
if (position >= 0) {
330+
args.splice(position, 1, wrap(args.at(position)));
331+
}
332+
333+
try {
334+
return start.runStores(ctx, fn, thisArg, ...args);
335+
} catch (err) {
336+
ctx.error = err;
337+
error.publish(ctx);
338+
throw err;
339+
} finally {
340+
end.publish(ctx);
341+
}
342+
}
343+
}
344+
345+
function tracingChannel(nameOrChannels) {
346+
return new TracingChannel(nameOrChannels);
347+
}
348+
142349
module.exports = {
143350
channel,
144351
hasSubscribers,
145352
subscribe,
353+
tracingChannel,
146354
unsubscribe,
147-
Channel
355+
Channel,
356+
TracingChannel
148357
};

0 commit comments

Comments
 (0)