Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 48 additions & 26 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// Ported from https://github.com/nodejs/undici/pull/907

'use strict'

const assert = require('node:assert')
Expand Down Expand Up @@ -50,23 +48,32 @@ class BodyReadable extends Readable {

this[kAbort] = abort

/**
* @type {Consume | null}
*/
/** @type {Consume | null} */
this[kConsume] = null

/** @type {number} */
this[kBytesRead] = 0
/**
* @type {ReadableStream|null}
*/

/** @type {ReadableStream|null} */
this[kBody] = null

/** @type {boolean} */
this[kUsed] = false

/** @type {string} */
this[kContentType] = contentType

/** @type {number|null} */
this[kContentLength] = Number.isFinite(contentLength) ? contentLength : null

// Is stream being consumed through Readable API?
// This is an optimization so that we avoid checking
// for 'data' and 'readable' listeners in the hot path
// inside push().
/**
* Is stream being consumed through Readable API?
* This is an optimization so that we avoid checking
* for 'data' and 'readable' listeners in the hot path
* inside push().
*
* @type {boolean}
*/
this[kReading] = false
}

Expand Down Expand Up @@ -96,7 +103,7 @@ class BodyReadable extends Readable {
}

/**
* @param {string} event
* @param {string|symbol} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
Expand All @@ -109,7 +116,7 @@ class BodyReadable extends Readable {
}

/**
* @param {string} event
* @param {string|symbol} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
Expand Down Expand Up @@ -147,12 +154,14 @@ class BodyReadable extends Readable {
* @returns {boolean}
*/
push (chunk) {
this[kBytesRead] += chunk ? chunk.length : 0

if (this[kConsume] && chunk !== null) {
consumePush(this[kConsume], chunk)
return this[kReading] ? super.push(chunk) : true
if (chunk) {
this[kBytesRead] += chunk.length
if (this[kConsume]) {
consumePush(this[kConsume], chunk)
return this[kReading] ? super.push(chunk) : true
}
}

return super.push(chunk)
}

Expand Down Expand Up @@ -338,9 +347,23 @@ function isUnusable (bodyReadable) {
return util.isDisturbed(bodyReadable) || isLocked(bodyReadable)
}

/**
* @typedef {'text' | 'json' | 'blob' | 'bytes' | 'arrayBuffer'} ConsumeType
*/

/**
* @template {ConsumeType} T
* @typedef {T extends 'text' ? string :
* T extends 'json' ? unknown :
* T extends 'blob' ? Blob :
* T extends 'arrayBuffer' ? ArrayBuffer :
* T extends 'bytes' ? Uint8Array :
* never
* } ConsumeReturnType
*/
/**
* @typedef {object} Consume
* @property {string} type
* @property {ConsumeType} type
* @property {BodyReadable} stream
* @property {((value?: any) => void)} resolve
* @property {((err: Error) => void)} reject
Expand All @@ -349,9 +372,10 @@ function isUnusable (bodyReadable) {
*/

/**
* @template {ConsumeType} T
* @param {BodyReadable} stream
* @param {string} type
* @returns {Promise<any>}
* @param {T} type
* @returns {Promise<ConsumeReturnType<T>>}
*/
function consume (stream, type) {
assert(!stream[kConsume])
Expand All @@ -361,9 +385,7 @@ function consume (stream, type) {
const rState = stream._readableState
if (rState.destroyed && rState.closeEmitted === false) {
stream
.on('error', err => {
reject(err)
})
.on('error', reject)
.on('close', () => {
reject(new TypeError('unusable'))
})
Expand Down Expand Up @@ -438,7 +460,7 @@ function consumeStart (consume) {
/**
* @param {Buffer[]} chunks
* @param {number} length
* @param {BufferEncoding} encoding
* @param {BufferEncoding} [encoding='utf8']
* @returns {string}
*/
function chunksDecode (chunks, length, encoding) {
Expand Down
Loading