Skip to content

Commit c8352d7

Browse files
authored
Cleanup websocket (#3257)
* cleanup websocket receiver * fix 9.3.2-9.3.8; 9.4.2-9.4.8; 10.1.1 * fixup * fixup
1 parent f5025a8 commit c8352d7

File tree

1 file changed

+57
-132
lines changed

1 file changed

+57
-132
lines changed

lib/web/websocket/receiver.js

Lines changed: 57 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ const {
1212
websocketMessageReceived,
1313
utf8Decode,
1414
isControlFrame,
15-
isContinuationFrame,
16-
isTextBinaryFrame
15+
isTextBinaryFrame,
16+
isContinuationFrame
1717
} = require('./util')
1818
const { WebsocketFrameSend } = require('./frame')
19-
const { CloseEvent } = require('./events')
19+
const { closeWebSocketConnection } = require('./connection')
2020

2121
// This code was influenced by ws released under the MIT license.
2222
// Copyright (c) 2011 Einar Otto Stangvik <[email protected]>
@@ -26,6 +26,7 @@ const { CloseEvent } = require('./events')
2626
class ByteParser extends Writable {
2727
#buffers = []
2828
#byteOffset = 0
29+
#loop = false
2930

3031
#state = parserStates.INFO
3132

@@ -45,6 +46,7 @@ class ByteParser extends Writable {
4546
_write (chunk, _, callback) {
4647
this.#buffers.push(chunk)
4748
this.#byteOffset += chunk.length
49+
this.#loop = true
4850

4951
this.run(callback)
5052
}
@@ -55,7 +57,7 @@ class ByteParser extends Writable {
5557
* or not enough bytes are buffered to parse.
5658
*/
5759
run (callback) {
58-
while (true) {
60+
while (this.#loop) {
5961
if (this.#state === parserStates.INFO) {
6062
// If there aren't enough bytes to parse the payload length, etc.
6163
if (this.#byteOffset < 2) {
@@ -67,6 +69,13 @@ class ByteParser extends Writable {
6769
const opcode = buffer[0] & 0x0F
6870
const masked = (buffer[1] & 0x80) === 0x80
6971

72+
const fragmented = !fin && opcode !== opcodes.CONTINUATION
73+
const payloadLength = buffer[1] & 0x7F
74+
75+
const rsv1 = buffer[0] & 0x40
76+
const rsv2 = buffer[0] & 0x20
77+
const rsv3 = buffer[0] & 0x10
78+
7079
if (!isValidOpcode(opcode)) {
7180
failWebsocketConnection(this.ws, 'Invalid opcode received')
7281
return callback()
@@ -77,22 +86,16 @@ class ByteParser extends Writable {
7786
return callback()
7887
}
7988

80-
const rsv1 = (buffer[0] & 0x40) !== 0
81-
const rsv2 = (buffer[0] & 0x20) !== 0
82-
const rsv3 = (buffer[0] & 0x10) !== 0
83-
8489
// MUST be 0 unless an extension is negotiated that defines meanings
8590
// for non-zero values. If a nonzero value is received and none of
8691
// the negotiated extensions defines the meaning of such a nonzero
8792
// value, the receiving endpoint MUST _Fail the WebSocket
8893
// Connection_.
89-
if (rsv1 || rsv2 || rsv3) {
94+
if (rsv1 !== 0 || rsv2 !== 0 || rsv3 !== 0) {
9095
failWebsocketConnection(this.ws, 'RSV1, RSV2, RSV3 must be clear')
9196
return
9297
}
9398

94-
const fragmented = !fin && opcode !== opcodes.CONTINUATION
95-
9699
if (fragmented && !isTextBinaryFrame(opcode)) {
97100
// Only text and binary frames can be fragmented
98101
failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
@@ -101,39 +104,27 @@ class ByteParser extends Writable {
101104

102105
// If we are already parsing a text/binary frame and do not receive either
103106
// a continuation frame or close frame, fail the connection.
104-
if (isTextBinaryFrame(opcode) && this.#info.opcode !== undefined) {
107+
if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) {
105108
failWebsocketConnection(this.ws, 'Expected continuation frame')
106109
return
107110
}
108111

109-
const payloadLength = buffer[1] & 0x7F
110-
111-
if (isControlFrame(opcode)) {
112-
const loop = this.parseControlFrame(callback, {
113-
header: buffer,
114-
opcode,
115-
fragmented,
116-
payloadLength
117-
})
112+
if (this.#info.fragmented && fragmented) {
113+
// A fragmented frame can't be fragmented itself
114+
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
115+
return
116+
}
118117

119-
if (loop) {
120-
continue
121-
} else {
122-
return
123-
}
124-
} else if (isContinuationFrame(opcode)) {
125-
const loop = this.parseContinuationFrame(callback, {
126-
header: buffer,
127-
fin,
128-
fragmented,
129-
payloadLength
130-
})
118+
// "All control frames MUST have a payload length of 125 bytes or less
119+
// and MUST NOT be fragmented."
120+
if ((payloadLength > 125 || fragmented) && isControlFrame(opcode)) {
121+
failWebsocketConnection(this.ws, 'Control frame either too large or fragmented')
122+
return
123+
}
131124

132-
if (loop) {
133-
continue
134-
} else {
135-
return
136-
}
125+
if (isContinuationFrame(opcode) && this.#fragments.length === 0) {
126+
failWebsocketConnection(this.ws, 'Unexpected continuation frame')
127+
return
137128
}
138129

139130
if (payloadLength <= 125) {
@@ -145,16 +136,14 @@ class ByteParser extends Writable {
145136
this.#state = parserStates.PAYLOADLENGTH_64
146137
}
147138

139+
if (isTextBinaryFrame(opcode)) {
140+
this.#info.binaryType = opcode
141+
}
142+
148143
this.#info.opcode = opcode
149144
this.#info.masked = masked
150145
this.#info.fin = fin
151146
this.#info.fragmented = fragmented
152-
153-
if (this.#info.fragmented && payloadLength > 125) {
154-
// A fragmented frame can't be fragmented itself
155-
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
156-
return
157-
}
158147
} else if (this.#state === parserStates.PAYLOADLENGTH_16) {
159148
if (this.#byteOffset < 2) {
160149
return callback()
@@ -189,42 +178,40 @@ class ByteParser extends Writable {
189178
this.#state = parserStates.READ_DATA
190179
} else if (this.#state === parserStates.READ_DATA) {
191180
if (this.#byteOffset < this.#info.payloadLength) {
192-
// If there is still more data in this chunk that needs to be read
193181
return callback()
194-
} else if (this.#byteOffset >= this.#info.payloadLength) {
195-
const body = this.consume(this.#info.payloadLength)
182+
}
183+
184+
const body = this.consume(this.#info.payloadLength)
185+
186+
if (isControlFrame(this.#info.opcode)) {
187+
this.#loop = this.parseControlFrame(body)
188+
} else {
196189
this.#fragments.push(body)
197190

198191
// If the frame is not fragmented, a message has been received.
199192
// If the frame is fragmented, it will terminate with a fin bit set
200193
// and an opcode of 0 (continuation), therefore we handle that when
201194
// parsing continuation frames, not here.
202-
if (!this.#info.fragmented) {
195+
if (!this.#info.fragmented && this.#info.fin) {
203196
const fullMessage = Buffer.concat(this.#fragments)
204-
websocketMessageReceived(this.ws, this.#info.opcode, fullMessage)
205-
this.#info = {}
197+
websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage)
206198
this.#fragments.length = 0
207199
}
208-
209-
this.#state = parserStates.INFO
210200
}
211-
}
212201

213-
if (this.#byteOffset === 0 && this.#info.payloadLength !== 0) {
214-
callback()
215-
break
202+
this.#state = parserStates.INFO
216203
}
217204
}
218205
}
219206

220207
/**
221208
* Take n bytes from the buffered Buffers
222209
* @param {number} n
223-
* @returns {Buffer|null}
210+
* @returns {Buffer}
224211
*/
225212
consume (n) {
226213
if (n > this.#byteOffset) {
227-
return null
214+
throw new Error('Called consume() before buffers satiated.')
228215
} else if (n === 0) {
229216
return emptyBuffer
230217
}
@@ -297,40 +284,25 @@ class ByteParser extends Writable {
297284

298285
/**
299286
* Parses control frames.
300-
* @param {Buffer} data
301-
* @param {(err?: Error) => void} callback
302-
* @param {{ opcode: number, fragmented: boolean, payloadLength: number, header: Buffer }} info
287+
* @param {Buffer} body
303288
*/
304-
parseControlFrame (callback, info) {
305-
assert(!info.fragmented)
306-
307-
if (info.payloadLength > 125) {
308-
// Control frames can have a payload length of 125 bytes MAX
309-
failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.')
310-
return false
311-
} else if (this.#byteOffset < info.payloadLength) {
312-
this.#buffers.unshift(info.header)
313-
this.#byteOffset += 2
314-
315-
callback()
316-
return false
317-
}
318-
319-
const body = this.consume(info.payloadLength)
289+
parseControlFrame (body) {
290+
const { opcode, payloadLength } = this.#info
320291

321-
if (info.opcode === opcodes.CLOSE) {
322-
if (info.payloadLength === 1) {
292+
if (opcode === opcodes.CLOSE) {
293+
if (payloadLength === 1) {
323294
failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
324-
return
295+
return false
325296
}
326297

327298
this.#info.closeInfo = this.parseCloseBody(body)
328299

329300
if (this.#info.closeInfo.error) {
330301
const { code, reason } = this.#info.closeInfo
331302

332-
callback(new CloseEvent('close', { wasClean: false, reason, code }))
333-
return
303+
closeWebSocketConnection(this.ws, code, reason, reason.length)
304+
failWebsocketConnection(this.ws, reason)
305+
return false
334306
}
335307

336308
if (this.ws[kSentClose] !== sentCloseFrameState.SENT) {
@@ -362,9 +334,8 @@ class ByteParser extends Writable {
362334
this.ws[kReceivedClose] = true
363335

364336
this.end()
365-
366-
return
367-
} else if (info.opcode === opcodes.PING) {
337+
return false
338+
} else if (opcode === opcodes.PING) {
368339
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
369340
// response, unless it already received a Close frame.
370341
// A Pong frame sent in response to a Ping frame must have identical
@@ -381,12 +352,7 @@ class ByteParser extends Writable {
381352
})
382353
}
383354
}
384-
385-
if (this.#byteOffset <= 0) {
386-
callback()
387-
return false
388-
}
389-
} else if (info.opcode === opcodes.PONG) {
355+
} else if (opcode === opcodes.PONG) {
390356
// A Pong frame MAY be sent unsolicited. This serves as a
391357
// unidirectional heartbeat. A response to an unsolicited Pong frame is
392358
// not expected.
@@ -396,47 +362,6 @@ class ByteParser extends Writable {
396362
payload: body
397363
})
398364
}
399-
400-
if (this.#byteOffset <= 0) {
401-
callback()
402-
return false
403-
}
404-
}
405-
406-
return true
407-
}
408-
409-
/**
410-
* Parses continuation frames.
411-
* @param {Buffer} data
412-
* @param {(err?: Error) => void} callback
413-
* @param {{ fin: boolean, fragmented: boolean, payloadLength: number, header: Buffer }} info
414-
*/
415-
parseContinuationFrame (callback, info) {
416-
// If we received a continuation frame before we started parsing another frame.
417-
if (this.#info.opcode === undefined) {
418-
failWebsocketConnection(this.ws, 'Received unexpected continuation frame.')
419-
return false
420-
} else if (this.#byteOffset < info.payloadLength) {
421-
this.#buffers.unshift(info.header)
422-
this.#byteOffset += 2
423-
424-
callback()
425-
return false
426-
}
427-
428-
const body = this.consume(info.payloadLength)
429-
this.#fragments.push(body)
430-
431-
// A fragmented message consists of a single frame with the FIN bit
432-
// clear and an opcode other than 0, followed by zero or more frames
433-
// with the FIN bit clear and the opcode set to 0, and terminated by
434-
// a single frame with the FIN bit set and an opcode of 0.
435-
if (info.fin) {
436-
const message = Buffer.concat(this.#fragments)
437-
websocketMessageReceived(this.ws, this.#info.opcode, message)
438-
this.#fragments.length = 0
439-
this.#info = {}
440365
}
441366

442367
return true

0 commit comments

Comments
 (0)