Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 48 additions & 35 deletions src/listener.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { IncomingMessage, ServerResponse, OutgoingHttpHeaders } from 'node:http'
import { Http2ServerRequest } from 'node:http2'
import type { Http2ServerResponse } from 'node:http2'
import type { Writable } from 'node:stream'
import type { IncomingMessageWithWrapBodyStream } from './request'
import {
abortControllerKey,
Expand All @@ -12,7 +13,12 @@ import {
import { cacheKey, Response as LightweightResponse } from './response'
import type { InternalCache } from './response'
import type { CustomErrorHandler, FetchCallback, HttpBindings } from './types'
import { writeFromReadableStream, buildOutgoingHttpHeaders } from './utils'
import {
readWithoutBlocking,
writeFromReadableStream,
writeFromReadableStreamDefaultReader,
buildOutgoingHttpHeaders,
} from './utils'
import { X_ALREADY_SENT } from './utils/response/constants'
import './globals'

Expand All @@ -21,9 +27,6 @@ type OutgoingHasOutgoingEnded = Http2ServerResponse & {
[outgoingEnded]?: () => void
}

const regBuffer = /^no$/i
const regContentType = /^(application\/json\b|text\/(?!event-stream\b))/i

const handleRequestError = (): Response =>
new Response(null, {
status: 400,
Expand Down Expand Up @@ -122,41 +125,51 @@ const responseViaResponseObject = async (
const resHeaderRecord: OutgoingHttpHeaders = buildOutgoingHttpHeaders(res.headers)

if (res.body) {
/**
* If content-encoding is set, we assume that the response should be not decoded.
* Else if transfer-encoding is set, we assume that the response should be streamed.
* Else if content-length is set, we assume that the response content has been taken care of.
* Else if x-accel-buffering is set to no, we assume that the response should be streamed.
* Else if content-type is not application/json nor text/* but can be text/event-stream,
* we assume that the response should be streamed.
*/
const reader = res.body.getReader()

const {
'transfer-encoding': transferEncoding,
'content-encoding': contentEncoding,
'content-length': contentLength,
'x-accel-buffering': accelBuffering,
'content-type': contentType,
} = resHeaderRecord
const values: Uint8Array[] = []
let done = false
let currentReadPromise: Promise<ReadableStreamReadResult<Uint8Array>> | undefined = undefined

if (
transferEncoding ||
contentEncoding ||
contentLength ||
// nginx buffering variant
(accelBuffering && regBuffer.test(accelBuffering as string)) ||
!regContentType.test(contentType as string)
) {
outgoing.writeHead(res.status, resHeaderRecord)
flushHeaders(outgoing)
// In the case of synchronous responses, usually a maximum of two readings is done
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a simple and good idea!

for (let i = 0; i < 2; i++) {
currentReadPromise = reader.read()
const chunk = await readWithoutBlocking(currentReadPromise).catch((e) => {
console.error(e)
done = true
})
if (!chunk) {
// Error occurred or currentReadPromise is not yet resolved.
// If an error occurs, immediately break the loop.
// If currentReadPromise is not yet resolved, pass it to writeFromReadableStreamDefaultReader.
break
}
currentReadPromise = undefined

await writeFromReadableStream(res.body, outgoing)
} else {
const buffer = await res.arrayBuffer()
resHeaderRecord['content-length'] = buffer.byteLength
if (chunk.value) {
values.push(chunk.value)
}
if (chunk.done) {
done = true
break
}
}

if (done && !('content-length' in resHeaderRecord)) {
resHeaderRecord['content-length'] = values.reduce((acc, value) => acc + value.length, 0)
}

outgoing.writeHead(res.status, resHeaderRecord)
outgoing.end(new Uint8Array(buffer))
outgoing.writeHead(res.status, resHeaderRecord)
values.forEach((value) => {
;(outgoing as Writable).write(value)
})
if (done) {
outgoing.end()
} else {
if (values.length === 0) {
flushHeaders(outgoing)
}
await writeFromReadableStreamDefaultReader(reader, outgoing, currentReadPromise)
}
} else if (resHeaderRecord[X_ALREADY_SENT]) {
// do nothing, the response has already been sent
Expand Down
31 changes: 21 additions & 10 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import type { OutgoingHttpHeaders } from 'node:http'
import type { Writable } from 'node:stream'

export function writeFromReadableStream(stream: ReadableStream<Uint8Array>, writable: Writable) {
if (stream.locked) {
throw new TypeError('ReadableStream is locked.')
} else if (writable.destroyed) {
return
}

const reader = stream.getReader()
export async function readWithoutBlocking(
readPromise: Promise<ReadableStreamReadResult<Uint8Array>>
): Promise<ReadableStreamReadResult<Uint8Array> | undefined> {
return Promise.race([readPromise, Promise.resolve().then(() => Promise.resolve(undefined))])
}

export function writeFromReadableStreamDefaultReader(
reader: ReadableStreamDefaultReader<Uint8Array>,
writable: Writable,
currentReadPromise?: Promise<ReadableStreamReadResult<Uint8Array>> | undefined
) {
const handleError = () => {
// ignore the error
}

writable.on('error', handleError)

reader.read().then(flow, handleStreamError)
;(currentReadPromise ?? reader.read()).then(flow, handleStreamError)

return reader.closed.finally(() => {
writable.off('error', handleError)
Expand Down Expand Up @@ -48,6 +49,16 @@ export function writeFromReadableStream(stream: ReadableStream<Uint8Array>, writ
}
}

export function writeFromReadableStream(stream: ReadableStream<Uint8Array>, writable: Writable) {
if (stream.locked) {
throw new TypeError('ReadableStream is locked.')
} else if (writable.destroyed) {
return
}

return writeFromReadableStreamDefaultReader(stream.getReader(), writable)
}

export const buildOutgoingHttpHeaders = (
headers: Headers | HeadersInit | null | undefined
): OutgoingHttpHeaders => {
Expand Down
Loading