Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,22 @@ diagnosticsChannel.channel('undici:request:create').subscribe(({ request }) => {

Note: a request is only loosely completed to a given socket.

## `undici:request:bodyChunkSent`

This message is published when a chunk of the request body is being sent.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:request:bodyChunkSent').subscribe(({ request, chunk }) => {
// request is the same object undici:request:create
})
```

## `undici:request:bodySent`

This message is published after the request body has been fully sent.

```js
import diagnosticsChannel from 'diagnostics_channel'

Expand All @@ -54,6 +67,18 @@ diagnosticsChannel.channel('undici:request:headers').subscribe(({ request, respo
})
```

## `undici:request:bodyChunkReceived`

This message is published after a chunk of the response body has been received.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:request:bodyChunkReceived').subscribe(({ request, chunk }) => {
// request is the same object undici:request:create
})
```

## `undici:request:trailers`

This message is published after the response body and trailers have been received, i.e. the response has been completed.
Expand Down
2 changes: 2 additions & 0 deletions lib/core/diagnostics.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const channels = {
// Request
create: diagnosticsChannel.channel('undici:request:create'),
bodySent: diagnosticsChannel.channel('undici:request:bodySent'),
bodyChunkSent: diagnosticsChannel.channel('undici:request:bodyChunkSent'),
bodyChunkReceived: diagnosticsChannel.channel('undici:request:bodyChunkReceived'),
headers: diagnosticsChannel.channel('undici:request:headers'),
trailers: diagnosticsChannel.channel('undici:request:trailers'),
error: diagnosticsChannel.channel('undici:request:error'),
Expand Down
6 changes: 6 additions & 0 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ class Request {
}

onBodySent (chunk) {
if (channels.bodyChunkSent.hasSubscribers) {
channels.bodyChunkSent.publish({ request: this, chunk })
}
if (this[kHandler].onBodySent) {
try {
return this[kHandler].onBodySent(chunk)
Expand Down Expand Up @@ -252,6 +255,9 @@ class Request {
assert(!this.aborted)
assert(!this.completed)

if (channels.bodyChunkReceived.hasSubscribers) {
channels.bodyChunkReceived.publish({ request: this, chunk })
}
try {
return this[kHandler].onData(chunk)
} catch (err) {
Expand Down
22 changes: 21 additions & 1 deletion test/node-test/diagnostics-channel/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const { Client } = require('../../..')
const { createServer } = require('node:http')

test('Diagnostics channel - get', (t) => {
const assert = tspl(t, { plan: 32 })
const assert = tspl(t, { plan: 36 })
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.setHeader('Content-Type', 'text/plain')
res.setHeader('trailer', 'foo')
Expand Down Expand Up @@ -105,16 +105,36 @@ test('Diagnostics channel - get', (t) => {
assert.equal(response.statusText, 'OK')
})

let bodySent = false
diagnosticsChannel.channel('undici:request:bodySent').subscribe(({ request }) => {
assert.equal(_req, request)
bodySent = true
})
diagnosticsChannel.channel('undici:request:bodyChunkSent').subscribe(() => {
assert.fail('should not emit undici:request:bodyChunkSent for GET requests')
})

let endEmitted = false

return new Promise((resolve) => {
const respChunks = []
diagnosticsChannel.channel('undici:request:bodyChunkReceived').subscribe(({ request, chunk }) => {
assert.equal(_req, request)
respChunks.push(chunk)
})

diagnosticsChannel.channel('undici:request:trailers').subscribe(({ request, trailers }) => {
assert.equal(bodySent, true)
assert.equal(request.completed, true)
assert.equal(_req, request)
// This event is emitted after the last chunk has been added to the body stream,
// not when it was consumed by the application
assert.equal(endEmitted, false)
assert.deepStrictEqual(trailers, [Buffer.from('foo'), Buffer.from('oof')])

const respData = Buffer.concat(respChunks)
assert.deepStrictEqual(respData, Buffer.from('hello'))

resolve()
})

Expand Down
25 changes: 24 additions & 1 deletion test/node-test/diagnostics-channel/post-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { Client } = require('../../..')
const { createServer } = require('node:http')

test('Diagnostics channel - post stream', (t) => {
const assert = tspl(t, { plan: 33 })
const assert = tspl(t, { plan: 43 })
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
req.resume()
res.setHeader('Content-Type', 'text/plain')
Expand Down Expand Up @@ -107,20 +107,43 @@ test('Diagnostics channel - post stream', (t) => {
assert.equal(response.statusText, 'OK')
})

let bodySent = false
const bodyChunks = []
diagnosticsChannel.channel('undici:request:bodyChunkSent').subscribe(({ request, chunk }) => {
assert.equal(_req, request)
// Chunk can be a string or a Buffer, depending on the stream writer.
assert.equal(typeof chunk, 'string')
bodyChunks.push(Buffer.from(chunk))
})
diagnosticsChannel.channel('undici:request:bodySent').subscribe(({ request }) => {
assert.equal(_req, request)
bodySent = true

const requestBody = Buffer.concat(bodyChunks)
assert.deepStrictEqual(requestBody, Buffer.from('hello world'))
})

let endEmitted = false

return new Promise((resolve) => {
const respChunks = []
diagnosticsChannel.channel('undici:request:bodyChunkReceived').subscribe(({ request, chunk }) => {
assert.equal(_req, request)
respChunks.push(chunk)
})

diagnosticsChannel.channel('undici:request:trailers').subscribe(({ request, trailers }) => {
assert.equal(bodySent, true)
assert.equal(request.completed, true)
assert.equal(_req, request)
// This event is emitted after the last chunk has been added to the body stream,
// not when it was consumed by the application
assert.equal(endEmitted, false)
assert.deepStrictEqual(trailers, [Buffer.from('foo'), Buffer.from('oof')])

const respData = Buffer.concat(respChunks)
assert.deepStrictEqual(respData, Buffer.from('hello'))

resolve()
})

Expand Down
24 changes: 23 additions & 1 deletion test/node-test/diagnostics-channel/post.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const { Client } = require('../../../')
const { createServer } = require('node:http')

test('Diagnostics channel - post', (t) => {
const assert = tspl(t, { plan: 33 })
const assert = tspl(t, { plan: 39 })
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
req.resume()
res.setHeader('Content-Type', 'text/plain')
Expand Down Expand Up @@ -105,20 +105,42 @@ test('Diagnostics channel - post', (t) => {
assert.equal(response.statusText, 'OK')
})

let bodySent = false
const bodyChunks = []
diagnosticsChannel.channel('undici:request:bodyChunkSent').subscribe(({ request, chunk }) => {
assert.equal(_req, request)
assert.equal(Buffer.isBuffer(chunk), true)
bodyChunks.push(chunk)
})
diagnosticsChannel.channel('undici:request:bodySent').subscribe(({ request }) => {
assert.equal(_req, request)
bodySent = true

const requestBody = Buffer.concat(bodyChunks)
assert.deepStrictEqual(requestBody, Buffer.from('hello world'))
})

let endEmitted = false

return new Promise((resolve) => {
const respChunks = []
diagnosticsChannel.channel('undici:request:bodyChunkReceived').subscribe(({ request, chunk }) => {
assert.equal(_req, request)
respChunks.push(chunk)
})

diagnosticsChannel.channel('undici:request:trailers').subscribe(({ request, trailers }) => {
assert.equal(bodySent, true)
assert.equal(request.completed, true)
assert.equal(_req, request)
// This event is emitted after the last chunk has been added to the body stream,
// not when it was consumed by the application
assert.equal(endEmitted, false)
assert.deepStrictEqual(trailers, [Buffer.from('foo'), Buffer.from('oof')])

const respData = Buffer.concat(respChunks)
assert.deepStrictEqual(respData, Buffer.from('hello'))

resolve()
})

Expand Down
3 changes: 3 additions & 0 deletions test/types/diagnostics-channel.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ const connectParams = {
}

expectAssignable<DiagnosticsChannel.RequestCreateMessage>({ request })
expectAssignable<DiagnosticsChannel.RequestBodyChunkSentMessage>({ request, chunk: Buffer.from('') })
expectAssignable<DiagnosticsChannel.RequestBodyChunkSentMessage>({ request, chunk: '' })
expectAssignable<DiagnosticsChannel.RequestBodySentMessage>({ request })
expectAssignable<DiagnosticsChannel.RequestHeadersMessage>({
request,
response
})
expectAssignable<DiagnosticsChannel.RequestBodyChunkReceivedMessage>({ request, chunk: Buffer.from('') })
expectAssignable<DiagnosticsChannel.RequestTrailersMessage>({
request,
trailers: [Buffer.from(''), Buffer.from('')]
Expand Down
9 changes: 9 additions & 0 deletions types/diagnostics-channel.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ declare namespace DiagnosticsChannel {
export interface RequestBodySentMessage {
request: Request;
}

export interface RequestBodyChunkSentMessage {
request: Request;
chunk: Uint8Array | string;
}
export interface RequestBodyChunkReceivedMessage {
request: Request;
chunk: Buffer;
}
export interface RequestHeadersMessage {
request: Request;
response: Response;
Expand Down
Loading