Skip to content

Commit 1d8b89e

Browse files
author
Stephen Belanger
committed
lib: add tracing channel to diagnostics_channel
1 parent 22c645d commit 1d8b89e

8 files changed

+1076
-21
lines changed

doc/api/diagnostics_channel.md

Lines changed: 560 additions & 0 deletions
Large diffs are not rendered by default.

lib/diagnostics_channel.js

Lines changed: 252 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@ const {
44
ArrayPrototypeIndexOf,
55
ArrayPrototypePush,
66
ArrayPrototypeSplice,
7+
FunctionPrototypeBind,
78
ObjectCreate,
89
ObjectGetPrototypeOf,
910
ObjectSetPrototypeOf,
11+
PromisePrototypeThen,
12+
PromiseReject,
13+
ReflectApply,
14+
SafeMap,
1015
SymbolHasInstance,
1116
} = primordials;
1217

@@ -23,11 +28,44 @@ const { triggerUncaughtException } = internalBinding('errors');
2328

2429
const { WeakReference } = internalBinding('util');
2530

31+
function decRef(channel) {
32+
channel._weak.decRef();
33+
if (channel._weak.getRef() === 0) {
34+
delete channels[channel.name];
35+
}
36+
}
37+
38+
function markActive(channel) {
39+
// eslint-disable-next-line no-use-before-define
40+
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
41+
channel._subscribers = [];
42+
channel._stores = new SafeMap();
43+
}
44+
45+
function maybeMarkInactive(channel) {
46+
// When there are no more active subscribers, restore to fast prototype.
47+
if (!channel._subscribers.length && !channel._stores.size) {
48+
// eslint-disable-next-line no-use-before-define
49+
ObjectSetPrototypeOf(channel, Channel.prototype);
50+
channel._subscribers = undefined;
51+
channel._stores = undefined;
52+
}
53+
}
54+
55+
function defaultTransform(data) {
56+
return data
57+
}
58+
59+
function wrapStoreRun(store, data, next, transform = defaultTransform) {
60+
return () => store.run(transform(data), next);
61+
}
62+
2663
// TODO(qard): should there be a C++ channel interface?
2764
class ActiveChannel {
2865
subscribe(subscription) {
2966
validateFunction(subscription, 'subscription');
3067
ArrayPrototypePush(this._subscribers, subscription);
68+
this._weak.incRef();
3169
}
3270

3371
unsubscribe(subscription) {
@@ -36,12 +74,28 @@ class ActiveChannel {
3674

3775
ArrayPrototypeSplice(this._subscribers, index, 1);
3876

39-
// When there are no more active subscribers, restore to fast prototype.
40-
if (!this._subscribers.length) {
41-
// eslint-disable-next-line no-use-before-define
42-
ObjectSetPrototypeOf(this, Channel.prototype);
77+
decRef(this);
78+
maybeMarkInactive(this);
79+
80+
return true;
81+
}
82+
83+
bindStore(store, transform) {
84+
const replacing = this._stores.has(store);
85+
if (!replacing) this._weak.incRef();
86+
this._stores.set(store, transform);
87+
}
88+
89+
unbindStore(store) {
90+
if (!this._stores.has(store)) {
91+
return false;
4392
}
4493

94+
this._stores.delete(store);
95+
96+
decRef(this);
97+
maybeMarkInactive(this);
98+
4599
return true;
46100
}
47101

@@ -61,11 +115,28 @@ class ActiveChannel {
61115
}
62116
}
63117
}
118+
119+
runStores(data, fn, thisArg, ...args) {
120+
this.publish(data);
121+
122+
// Bind base fn first due to AsyncLocalStorage.run not having thisArg
123+
fn = FunctionPrototypeBind(fn, thisArg, ...args);
124+
125+
for (const entry of this._stores.entries()) {
126+
const store = entry[0];
127+
const transform = entry[1];
128+
fn = wrapStoreRun(store, data, fn, transform);
129+
}
130+
131+
return fn();
132+
}
64133
}
65134

66135
class Channel {
67136
constructor(name) {
68137
this._subscribers = undefined;
138+
this._stores = undefined;
139+
this._weak = undefined;
69140
this.name = name;
70141
}
71142

@@ -76,20 +147,32 @@ class Channel {
76147
}
77148

78149
subscribe(subscription) {
79-
ObjectSetPrototypeOf(this, ActiveChannel.prototype);
80-
this._subscribers = [];
150+
markActive(this);
81151
this.subscribe(subscription);
82152
}
83153

84154
unsubscribe() {
85155
return false;
86156
}
87157

158+
bindStore(store, transform = (v) => v) {
159+
markActive(this);
160+
this.bindStore(store, transform);
161+
}
162+
163+
unbindStore() {
164+
return false;
165+
}
166+
88167
get hasSubscribers() {
89168
return false;
90169
}
91170

92171
publish() {}
172+
173+
runStores(data, fn, thisArg, ...args) {
174+
return ReflectApply(fn, thisArg, args);
175+
}
93176
}
94177

95178
const channels = ObjectCreate(null);
@@ -105,27 +188,17 @@ function channel(name) {
105188
}
106189

107190
channel = new Channel(name);
108-
channels[name] = new WeakReference(channel);
191+
channel._weak = new WeakReference(channel);
192+
channels[name] = channel._weak;
109193
return channel;
110194
}
111195

112196
function subscribe(name, subscription) {
113-
const chan = channel(name);
114-
channels[name].incRef();
115-
chan.subscribe(subscription);
197+
return channel(name).subscribe(subscription);
116198
}
117199

118200
function unsubscribe(name, subscription) {
119-
const chan = channel(name);
120-
if (!chan.unsubscribe(subscription)) {
121-
return false;
122-
}
123-
124-
channels[name].decRef();
125-
if (channels[name].getRef() === 0) {
126-
delete channels[name];
127-
}
128-
return true;
201+
return channel(name).unsubscribe(subscription);
129202
}
130203

131204
function hasSubscribers(name) {
@@ -139,10 +212,168 @@ function hasSubscribers(name) {
139212
return channel.hasSubscribers;
140213
}
141214

215+
const traceEvents = [
216+
'start',
217+
'end',
218+
'asyncStart',
219+
'asyncEnd',
220+
'error',
221+
];
222+
223+
function assertChannel(value, name) {
224+
if (!(value instanceof Channel)) {
225+
throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
226+
}
227+
}
228+
229+
class TracingChannel {
230+
constructor(nameOrChannels) {
231+
if (typeof nameOrChannels === 'string') {
232+
this.start = channel(`tracing:${nameOrChannels}:start`);
233+
this.end = channel(`tracing:${nameOrChannels}:end`);
234+
this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`);
235+
this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
236+
this.error = channel(`tracing:${nameOrChannels}:error`);
237+
} else if (typeof nameOrChannels === 'object') {
238+
const { start, end, asyncStart, asyncEnd, error } = nameOrChannels;
239+
240+
assertChannel(start, 'nameOrChannels.start');
241+
assertChannel(end, 'nameOrChannels.end');
242+
assertChannel(asyncStart, 'nameOrChannels.asyncStart');
243+
assertChannel(asyncEnd, 'nameOrChannels.asyncEnd');
244+
assertChannel(error, 'nameOrChannels.error');
245+
246+
this.start = start;
247+
this.end = end;
248+
this.asyncStart = asyncStart;
249+
this.asyncEnd = asyncEnd;
250+
this.error = error;
251+
} else {
252+
throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
253+
['string', 'object', 'Channel'],
254+
nameOrChannels);
255+
}
256+
}
257+
258+
subscribe(handlers) {
259+
for (const name of traceEvents) {
260+
if (!handlers[name]) continue;
261+
262+
this[name]?.subscribe(handlers[name]);
263+
}
264+
}
265+
266+
unsubscribe(handlers) {
267+
let done = true;
268+
269+
for (const name of traceEvents) {
270+
if (!handlers[name]) continue;
271+
272+
if (!this[name]?.unsubscribe(handlers[name])) {
273+
done = false;
274+
}
275+
}
276+
277+
return done;
278+
}
279+
280+
traceSync(fn, ctx = {}, thisArg, ...args) {
281+
const { start, end, error } = this;
282+
283+
try {
284+
const result = start.runStores(ctx, fn, thisArg, ...args);
285+
ctx.result = result;
286+
return result;
287+
} catch (err) {
288+
ctx.error = err;
289+
error.publish(ctx);
290+
throw err;
291+
} finally {
292+
end.publish(ctx);
293+
}
294+
}
295+
296+
tracePromise(fn, ctx = {}, thisArg, ...args) {
297+
const { start, end, asyncStart, asyncEnd, error } = this;
298+
299+
function reject(err) {
300+
ctx.error = err;
301+
error.publish(ctx);
302+
asyncStart.publish(ctx);
303+
// TODO: Is there a way to have asyncEnd _after_ the continuation?
304+
asyncEnd.publish(ctx);
305+
return PromiseReject(err);
306+
}
307+
308+
function resolve(result) {
309+
ctx.result = result;
310+
asyncStart.publish(ctx);
311+
// TODO: Is there a way to have asyncEnd _after_ the continuation?
312+
asyncEnd.publish(ctx);
313+
return result;
314+
}
315+
316+
try {
317+
const promise = start.runStores(ctx, fn, thisArg, ...args);
318+
return PromisePrototypeThen(promise, resolve, reject);
319+
} catch (err) {
320+
ctx.error = err;
321+
error.publish(ctx);
322+
throw err;
323+
} finally {
324+
end.publish(ctx);
325+
}
326+
}
327+
328+
traceCallback(fn, position = 0, ctx = {}, thisArg, ...args) {
329+
const { start, end, asyncStart, asyncEnd, error } = this;
330+
331+
function wrap(fn) {
332+
return function wrappedCallback(err, res) {
333+
if (err) {
334+
ctx.error = err;
335+
error.publish(ctx);
336+
} else {
337+
ctx.result = res;
338+
}
339+
340+
asyncStart.publish(ctx);
341+
try {
342+
if (fn) {
343+
return ReflectApply(fn, this, arguments);
344+
}
345+
} finally {
346+
asyncEnd.publish(ctx);
347+
}
348+
};
349+
}
350+
351+
if (position >= 0) {
352+
args.splice(position, 1, wrap(args.at(position)));
353+
}
354+
355+
try {
356+
return start.runStores(ctx, fn, thisArg, ...args);
357+
} catch (err) {
358+
ctx.error = err;
359+
error.publish(ctx);
360+
throw err;
361+
} finally {
362+
end.publish(ctx);
363+
}
364+
}
365+
}
366+
367+
function tracingChannel(nameOrChannels) {
368+
return new TracingChannel(nameOrChannels);
369+
}
370+
142371
module.exports = {
143372
channel,
144373
hasSubscribers,
145374
subscribe,
375+
tracingChannel,
146376
unsubscribe,
147-
Channel
377+
Channel,
378+
TracingChannel
148379
};

0 commit comments

Comments
 (0)