Skip to content

Commit f3ce91a

Browse files
committed
feat: always respond via readableStream
1 parent 29a19ae commit f3ce91a

File tree

3 files changed

+205
-154
lines changed

3 files changed

+205
-154
lines changed

src/listener.ts

Lines changed: 48 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { IncomingMessage, ServerResponse, OutgoingHttpHeaders } from 'node:http'
22
import { Http2ServerRequest } from 'node:http2'
33
import type { Http2ServerResponse } from 'node:http2'
4+
import type { Writable } from 'node:stream'
45
import type { IncomingMessageWithWrapBodyStream } from './request'
56
import {
67
abortControllerKey,
@@ -12,7 +13,12 @@ import {
1213
import { cacheKey, Response as LightweightResponse } from './response'
1314
import type { InternalCache } from './response'
1415
import type { CustomErrorHandler, FetchCallback, HttpBindings } from './types'
15-
import { writeFromReadableStream, buildOutgoingHttpHeaders } from './utils'
16+
import {
17+
nonBlockingRead,
18+
writeFromReadableStream,
19+
writeFromReadableStreamDefaultReader,
20+
buildOutgoingHttpHeaders,
21+
} from './utils'
1622
import { X_ALREADY_SENT } from './utils/response/constants'
1723
import './globals'
1824

@@ -21,9 +27,6 @@ type OutgoingHasOutgoingEnded = Http2ServerResponse & {
2127
[outgoingEnded]?: () => void
2228
}
2329

24-
const regBuffer = /^no$/i
25-
const regContentType = /^(application\/json\b|text\/(?!event-stream\b))/i
26-
2730
const handleRequestError = (): Response =>
2831
new Response(null, {
2932
status: 400,
@@ -122,41 +125,51 @@ const responseViaResponseObject = async (
122125
const resHeaderRecord: OutgoingHttpHeaders = buildOutgoingHttpHeaders(res.headers)
123126

124127
if (res.body) {
125-
/**
126-
* If content-encoding is set, we assume that the response should be not decoded.
127-
* Else if transfer-encoding is set, we assume that the response should be streamed.
128-
* Else if content-length is set, we assume that the response content has been taken care of.
129-
* Else if x-accel-buffering is set to no, we assume that the response should be streamed.
130-
* Else if content-type is not application/json nor text/* but can be text/event-stream,
131-
* we assume that the response should be streamed.
132-
*/
128+
const reader = res.body.getReader()
133129

134-
const {
135-
'transfer-encoding': transferEncoding,
136-
'content-encoding': contentEncoding,
137-
'content-length': contentLength,
138-
'x-accel-buffering': accelBuffering,
139-
'content-type': contentType,
140-
} = resHeaderRecord
130+
const values: Uint8Array[] = []
131+
let done = false
132+
let currentReadPromise: Promise<ReadableStreamReadResult<Uint8Array>> | undefined = undefined
141133

142-
if (
143-
transferEncoding ||
144-
contentEncoding ||
145-
contentLength ||
146-
// nginx buffering variant
147-
(accelBuffering && regBuffer.test(accelBuffering as string)) ||
148-
!regContentType.test(contentType as string)
149-
) {
150-
outgoing.writeHead(res.status, resHeaderRecord)
151-
flushHeaders(outgoing)
134+
// In the case of synchronous responses, usually a maximum of two readings is done
135+
for (let i = 0; i < 2; i++) {
136+
currentReadPromise = reader.read()
137+
const chunk = await nonBlockingRead(currentReadPromise).catch((e) => {
138+
console.error(e)
139+
done = true
140+
})
141+
if (!chunk) {
142+
// Error occurred or currentReadPromise is not yet resolved.
143+
// If an error occurs, immediately break the loop.
144+
// If currentReadPromise is not yet resolved, pass it to writeFromReadableStreamDefaultReader.
145+
break
146+
}
147+
currentReadPromise = undefined
152148

153-
await writeFromReadableStream(res.body, outgoing)
154-
} else {
155-
const buffer = await res.arrayBuffer()
156-
resHeaderRecord['content-length'] = buffer.byteLength
149+
if (chunk.value) {
150+
values.push(chunk.value)
151+
}
152+
if (chunk.done) {
153+
done = true
154+
break
155+
}
156+
}
157+
158+
if (done) {
159+
resHeaderRecord['content-length'] = values.reduce((acc, value) => acc + value.length, 0)
160+
}
157161

158-
outgoing.writeHead(res.status, resHeaderRecord)
159-
outgoing.end(new Uint8Array(buffer))
162+
outgoing.writeHead(res.status, resHeaderRecord)
163+
values.forEach((value) => {
164+
;(outgoing as Writable).write(value)
165+
})
166+
if (done) {
167+
outgoing.end()
168+
} else {
169+
if (values.length === 0) {
170+
flushHeaders(outgoing)
171+
}
172+
await writeFromReadableStreamDefaultReader(reader, outgoing, currentReadPromise)
160173
}
161174
} else if (resHeaderRecord[X_ALREADY_SENT]) {
162175
// do nothing, the response has already been sent

src/utils.ts

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
import type { OutgoingHttpHeaders } from 'node:http'
22
import type { Writable } from 'node:stream'
33

4-
export function writeFromReadableStream(stream: ReadableStream<Uint8Array>, writable: Writable) {
5-
if (stream.locked) {
6-
throw new TypeError('ReadableStream is locked.')
7-
} else if (writable.destroyed) {
8-
return
9-
}
10-
11-
const reader = stream.getReader()
4+
export async function nonBlockingRead(
5+
readPromise: Promise<ReadableStreamReadResult<Uint8Array>>
6+
): Promise<ReadableStreamReadResult<Uint8Array> | undefined> {
7+
return Promise.race([readPromise, Promise.resolve().then(() => Promise.resolve(undefined))])
8+
}
129

10+
export function writeFromReadableStreamDefaultReader(
11+
reader: ReadableStreamDefaultReader<Uint8Array>,
12+
writable: Writable,
13+
currentReadPromise?: Promise<ReadableStreamReadResult<Uint8Array>> | undefined
14+
) {
1315
const handleError = () => {
1416
// ignore the error
1517
}
1618

1719
writable.on('error', handleError)
18-
19-
reader.read().then(flow, handleStreamError)
20+
;(currentReadPromise ?? reader.read()).then(flow, handleStreamError)
2021

2122
return reader.closed.finally(() => {
2223
writable.off('error', handleError)
@@ -48,6 +49,16 @@ export function writeFromReadableStream(stream: ReadableStream<Uint8Array>, writ
4849
}
4950
}
5051

52+
export function writeFromReadableStream(stream: ReadableStream<Uint8Array>, writable: Writable) {
53+
if (stream.locked) {
54+
throw new TypeError('ReadableStream is locked.')
55+
} else if (writable.destroyed) {
56+
return
57+
}
58+
59+
return writeFromReadableStreamDefaultReader(stream.getReader(), writable)
60+
}
61+
5162
export const buildOutgoingHttpHeaders = (
5263
headers: Headers | HeadersInit | null | undefined
5364
): OutgoingHttpHeaders => {

0 commit comments

Comments
 (0)