Skip to content

Commit e72265b

Browse files
author
Stephen Belanger
committed
async_hooks: AsyncLocalStorage to diagnostics_channel integration
1 parent 789d1e0 commit e72265b

File tree

4 files changed

+118
-4
lines changed

4 files changed

+118
-4
lines changed

lib/_http_server.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
9292

9393
const dc = require('diagnostics_channel');
9494
const onRequestStartChannel = dc.channel('http.server.request.start');
95+
const onRequestEndChannel = dc.channel('http.server.request.end');
9596
const onResponseFinishChannel = dc.channel('http.server.response.finish');
9697

9798
const kServerResponse = Symbol('ServerResponse');
@@ -996,13 +997,18 @@ function parserOnIncoming(server, socket, state, req, keepAlive) {
996997
res.shouldKeepAlive = keepAlive;
997998
res[kUniqueHeaders] = server[kUniqueHeaders];
998999

999-
if (onRequestStartChannel.hasSubscribers) {
1000-
onRequestStartChannel.publish({
1000+
const dcMessage = (
1001+
onRequestStartChannel.hasSubscribers ||
1002+
onRequestEndChannel.hasSubscribers
1003+
) ? {
10011004
request: req,
10021005
response: res,
10031006
socket,
10041007
server
1005-
});
1008+
} : undefined;
1009+
1010+
if (onRequestStartChannel.hasSubscribers) {
1011+
onRequestStartChannel.publish(dcMessage);
10061012
}
10071013

10081014
if (socket._httpMessage) {
@@ -1063,6 +1069,10 @@ function parserOnIncoming(server, socket, state, req, keepAlive) {
10631069
server.emit('request', req, res);
10641070
}
10651071

1072+
if (onRequestEndChannel.hasSubscribers) {
1073+
onRequestEndChannel.publish(dcMessage);
1074+
}
1075+
10661076
return 0; // No special treatment.
10671077
}
10681078

lib/async_hooks.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const {
99
FunctionPrototypeBind,
1010
NumberIsSafeInteger,
1111
ObjectDefineProperties,
12+
ObjectPrototypeHasOwnProperty,
1213
ObjectIs,
1314
ReflectApply,
1415
Symbol,
@@ -26,6 +27,7 @@ const {
2627
validateString,
2728
} = require('internal/validators');
2829
const internal_async_hooks = require('internal/async_hooks');
30+
const { tracingChannel } = require('diagnostics_channel');
2931

3032
// Get functions
3133
// For userland AsyncResources, make sure to emit a destroy event when the
@@ -351,6 +353,47 @@ class AsyncLocalStorage {
351353
return resource[this.kResourceStore];
352354
}
353355
}
356+
357+
bindToTracingChannel(channel, transform = (v) => v) {
358+
if (typeof channel === 'string') {
359+
channel = tracingChannel(channel);
360+
}
361+
362+
const store = this;
363+
364+
function start(register) {
365+
register(store, transform);
366+
}
367+
368+
channel.subscribe({ start });
369+
return () => {
370+
channel.unsubscribe({ start });
371+
};
372+
}
373+
374+
static runInTracingChannel(channel, data, runner) {
375+
if (typeof channel === 'string') {
376+
channel = tracingChannel(channel);
377+
}
378+
379+
const bindings = [];
380+
381+
function register(store, transform) {
382+
bindings.push({ store, data: transform(data) });
383+
}
384+
385+
return channel.traceSync(() => {
386+
for (const { store, data } of bindings) {
387+
runner = wrapRunInStoreBinding(store, data, runner);
388+
}
389+
390+
return runner();
391+
}, register);
392+
}
393+
}
394+
395+
function wrapRunInStoreBinding(store, data, next) {
396+
return () => store.run(data, next);
354397
}
355398

356399
// Placing all exports down here because the exported classes won't export

lib/diagnostics_channel.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,5 +289,6 @@ module.exports = {
289289
subscribe,
290290
tracingChannel,
291291
unsubscribe,
292-
Channel
292+
Channel,
293+
TracingChannel
293294
};
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { AsyncLocalStorage } = require('async_hooks');
6+
7+
const contextA = {
8+
foo: 'bar'
9+
};
10+
const contextB = {
11+
baz: 'buz'
12+
};
13+
14+
const expected = [
15+
contextA,
16+
contextB
17+
];
18+
19+
let builds = 0;
20+
let runs = 0;
21+
22+
const storageA = new AsyncLocalStorage();
23+
const storageB = new AsyncLocalStorage();
24+
25+
// Check both passthrough bindings and transformed bindings
26+
storageA.bindToTracingChannel('test');
27+
storageB.bindToTracingChannel('test', common.mustCall((received) => {
28+
assert.deepStrictEqual(received, expected[builds++]);
29+
return { received };
30+
}, 2));
31+
32+
function check(context) {
33+
assert.deepStrictEqual(storageA.getStore(), context);
34+
assert.deepStrictEqual(storageB.getStore(), {
35+
received: context
36+
});
37+
}
38+
39+
// Should have no context before run
40+
assert.strictEqual(storageA.getStore(), undefined);
41+
assert.strictEqual(storageB.getStore(), undefined);
42+
43+
AsyncLocalStorage.runInTracingChannel('test', expected[runs], common.mustCall(() => {
44+
// Should have set expected context
45+
check(expected[runs]);
46+
47+
// Should support nested contexts
48+
runs++;
49+
AsyncLocalStorage.runInTracingChannel('test', expected[runs], common.mustCall(() => {
50+
check(expected[runs]);
51+
}));
52+
runs--;
53+
54+
// Should have restored outer context
55+
check(expected[runs]);
56+
}));
57+
58+
// Should have no context after run
59+
assert.strictEqual(storageA.getStore(), undefined);
60+
assert.strictEqual(storageB.getStore(), undefined);

0 commit comments

Comments
 (0)