Skip to content

Commit 5c904dd

Browse files
authored
add websocket to websocket diagnostic channels (#4321)
1 parent 44fc6d1 commit 5c904dd

File tree

8 files changed

+122
-89
lines changed

8 files changed

+122
-89
lines changed

docs/docs/api/DiagnosticsChannel.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,11 @@ This message is published after the client has successfully connected to a serve
169169
```js
170170
import diagnosticsChannel from 'diagnostics_channel'
171171

172-
diagnosticsChannel.channel('undici:websocket:open').subscribe(({ address, protocol, extensions }) => {
172+
diagnosticsChannel.channel('undici:websocket:open').subscribe(({ address, protocol, extensions, websocket }) => {
173173
console.log(address) // address, family, and port
174174
console.log(protocol) // negotiated subprotocols
175175
console.log(extensions) // negotiated extensions
176+
console.log(websocket) // the WebSocket instance
176177
})
177178
```
178179

@@ -184,7 +185,7 @@ This message is published after the connection has closed.
184185
import diagnosticsChannel from 'diagnostics_channel'
185186

186187
diagnosticsChannel.channel('undici:websocket:close').subscribe(({ websocket, code, reason }) => {
187-
console.log(websocket) // the WebSocket object
188+
console.log(websocket) // the WebSocket instance
188189
console.log(code) // the closing status code
189190
console.log(reason) // the closing reason
190191
})
@@ -209,9 +210,10 @@ This message is published after the client receives a ping frame, if the connect
209210
```js
210211
import diagnosticsChannel from 'diagnostics_channel'
211212

212-
diagnosticsChannel.channel('undici:websocket:ping').subscribe(({ payload }) => {
213+
diagnosticsChannel.channel('undici:websocket:ping').subscribe(({ payload, websocket }) => {
213214
// a Buffer or undefined, containing the optional application data of the frame
214215
console.log(payload)
216+
console.log(websocket) // the WebSocket instance
215217
})
216218
```
217219

@@ -222,8 +224,9 @@ This message is published after the client receives a pong frame.
222224
```js
223225
import diagnosticsChannel from 'diagnostics_channel'
224226

225-
diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload }) => {
227+
diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload, websocket }) => {
226228
// a Buffer or undefined, containing the optional application data of the frame
227229
console.log(payload)
230+
console.log(websocket) // the WebSocket instance
228231
})
229232
```

lib/web/websocket/connection.js

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
const { uid, states, sentCloseFrameState, emptyBuffer, opcodes } = require('./constants')
44
const { parseExtensions, isClosed, isClosing, isEstablished, validateCloseCodeAndReason } = require('./util')
5-
const { channels } = require('../../core/diagnostics')
65
const { makeRequest } = require('../fetch/request')
76
const { fetching } = require('../fetch/index')
87
const { Headers, getHeadersList } = require('../fetch/headers')
@@ -200,14 +199,6 @@ function establishWebSocketConnection (url, protocols, client, handler, options)
200199
response.socket.on('close', handler.onSocketClose)
201200
response.socket.on('error', handler.onSocketError)
202201

203-
if (channels.open.hasSubscribers) {
204-
channels.open.publish({
205-
address: response.socket.address(),
206-
protocol: secProtocol,
207-
extensions: secExtension
208-
})
209-
}
210-
211202
handler.wasEverConnected = true
212203
handler.onConnectionEstablished(response, extensions)
213204
}

lib/web/websocket/receiver.js

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
const { Writable } = require('node:stream')
44
const assert = require('node:assert')
55
const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants')
6-
const { channels } = require('../../core/diagnostics')
76
const {
87
isValidStatusCode,
98
isValidOpcode,
@@ -423,22 +422,13 @@ class ByteParser extends Writable {
423422

424423
this.#handler.socket.write(frame.createFrame(opcodes.PONG))
425424

426-
if (channels.ping.hasSubscribers) {
427-
channels.ping.publish({
428-
payload: body
429-
})
430-
}
425+
this.#handler.onPing(body)
431426
}
432427
} else if (opcode === opcodes.PONG) {
433428
// A Pong frame MAY be sent unsolicited. This serves as a
434429
// unidirectional heartbeat. A response to an unsolicited Pong frame is
435430
// not expected.
436-
437-
if (channels.pong.hasSubscribers) {
438-
channels.pong.publish({
439-
payload: body
440-
})
441-
}
431+
this.#handler.onPong(body)
442432
}
443433

444434
return true

lib/web/websocket/stream/websocketstream.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ class WebSocketStream {
6464
this.#handler.socket.destroy()
6565
},
6666
onSocketClose: () => this.#onSocketClose(),
67+
onPing: () => {},
68+
onPong: () => {},
6769

6870
readyState: states.CONNECTING,
6971
socket: null,

lib/web/websocket/websocket.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ const { channels } = require('../../core/diagnostics')
3535
* @property {(chunk: Buffer) => void} onSocketData
3636
* @property {(err: Error) => void} onSocketError
3737
* @property {() => void} onSocketClose
38+
* @property {(body: Buffer) => void} onPing
39+
* @property {(body: Buffer) => void} onPong
3840
*
3941
* @property {number} readyState
4042
* @property {import('stream').Duplex} socket
@@ -81,6 +83,22 @@ class WebSocket extends EventTarget {
8183
this.#handler.socket.destroy()
8284
},
8385
onSocketClose: () => this.#onSocketClose(),
86+
onPing: (body) => {
87+
if (channels.ping.hasSubscribers) {
88+
channels.ping.publish({
89+
payload: body,
90+
websocket: this
91+
})
92+
}
93+
},
94+
onPong: (body) => {
95+
if (channels.pong.hasSubscribers) {
96+
channels.pong.publish({
97+
payload: body,
98+
websocket: this
99+
})
100+
}
101+
},
84102

85103
readyState: states.CONNECTING,
86104
socket: null,
@@ -462,6 +480,15 @@ class WebSocket extends EventTarget {
462480

463481
// 4. Fire an event named open at the WebSocket object.
464482
fireEvent('open', this)
483+
484+
if (channels.open.hasSubscribers) {
485+
channels.open.publish({
486+
address: response.socket.address(),
487+
protocol: this.#protocol,
488+
extensions: this.#extensions,
489+
websocket: this
490+
})
491+
}
465492
}
466493

467494
#onFail (code, reason, cause) {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
'use strict'
2+
3+
const { test } = require('node:test')
4+
const dc = require('node:diagnostics_channel')
5+
const { WebSocketServer } = require('ws')
6+
const { WebSocket } = require('../..')
7+
const { tspl } = require('@matteo.collina/tspl')
8+
9+
test('diagnostics channel - undici:websocket:[open/close]', async (t) => {
10+
const { equal, completed } = tspl(t, { plan: 6 })
11+
12+
const server = new WebSocketServer({ port: 0 })
13+
const { port } = server.address()
14+
const ws = new WebSocket(`ws://localhost:${port}`, 'chat')
15+
16+
server.on('connection', (ws) => {
17+
ws.close(1000, 'goodbye')
18+
})
19+
20+
const openListener = ({ extensions, protocol, websocket }) => {
21+
equal(extensions, '')
22+
equal(protocol, 'chat')
23+
equal(websocket, ws)
24+
}
25+
26+
const closeListener = ({ websocket, code, reason }) => {
27+
equal(code, 1000)
28+
equal(reason, 'goodbye')
29+
equal(websocket, ws)
30+
}
31+
32+
dc.channel('undici:websocket:open').subscribe(openListener)
33+
dc.channel('undici:websocket:close').subscribe(closeListener)
34+
35+
t.after(() => {
36+
server.close()
37+
dc.channel('undici:websocket:open').unsubscribe(openListener)
38+
dc.channel('undici:websocket:close').unsubscribe(closeListener)
39+
})
40+
41+
await completed
42+
})
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
'use strict'
2+
3+
const { test } = require('node:test')
4+
const dc = require('node:diagnostics_channel')
5+
const { WebSocketServer } = require('ws')
6+
const { WebSocket } = require('../..')
7+
const { tspl } = require('@matteo.collina/tspl')
8+
9+
test('diagnostics channel - undici:websocket:[ping/pong]', async (t) => {
10+
const { deepStrictEqual, equal, completed } = tspl(t, { plan: 4 })
11+
12+
const server = new WebSocketServer({ port: 0 })
13+
const { port } = server.address()
14+
const ws = new WebSocket(`ws://localhost:${port}`, 'chat')
15+
16+
server.on('connection', (ws) => {
17+
ws.ping('Ping')
18+
ws.pong('Pong')
19+
})
20+
21+
const pingListener = ({ websocket, payload }) => {
22+
equal(websocket, ws)
23+
deepStrictEqual(payload, Buffer.from('Ping'))
24+
}
25+
26+
const pongListener = ({ websocket, payload }) => {
27+
equal(websocket, ws)
28+
deepStrictEqual(payload, Buffer.from('Pong'))
29+
}
30+
31+
dc.channel('undici:websocket:ping').subscribe(pingListener)
32+
dc.channel('undici:websocket:pong').subscribe(pongListener)
33+
34+
t.after(() => {
35+
server.close()
36+
ws.close()
37+
dc.channel('undici:websocket:ping').unsubscribe(pingListener)
38+
dc.channel('undici:websocket:pong').unsubscribe(pongListener)
39+
})
40+
41+
await completed
42+
})

test/websocket/diagnostics-channel.js

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)