Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,45 @@ export function writeFromReadableStream(stream: ReadableStream<Uint8Array>, writ
if (stream.locked) {
throw new TypeError('ReadableStream is locked.')
} else if (writable.destroyed) {
stream.cancel()
return
}

const reader = stream.getReader()
writable.on('close', cancel)
writable.on('error', cancel)
reader.read().then(flow, cancel)

const handleError = () => {
// ignore the error
}

writable.on('error', handleError)

reader.read().then(flow, handleStreamError)

return reader.closed.finally(() => {
writable.off('close', cancel)
writable.off('error', cancel)
writable.off('error', handleError)
})

// eslint-disable-next-line @typescript-eslint/no-explicit-any
function cancel(error?: any) {
reader.cancel(error).catch(() => {})
function handleStreamError(error: any) {
if (error) {
writable.destroy(error)
}
}

function onDrain() {
reader.read().then(flow, cancel)
reader.read().then(flow, handleStreamError)
}

function flow({ done, value }: ReadableStreamReadResult<Uint8Array>): void | Promise<void> {
try {
if (done) {
writable.end()
} else if (!writable.write(value)) {
writable.once('drain', onDrain)
} else {
return reader.read().then(flow, cancel)
return reader.read().then(flow, handleStreamError)
}
} catch (e) {
cancel(e)
handleStreamError(e)
}
}
}
Expand Down
41 changes: 40 additions & 1 deletion test/utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { buildOutgoingHttpHeaders } from '../src/utils'
import { Writable } from 'node:stream'
import { buildOutgoingHttpHeaders, writeFromReadableStream } from '../src/utils'

describe('buildOutgoingHttpHeaders', () => {
it('original content-type is preserved', () => {
Expand Down Expand Up @@ -71,3 +72,41 @@ describe('buildOutgoingHttpHeaders', () => {
})
})
})

describe('writeFromReadableStream', () => {
it('should handle client disconnection gracefully without canceling stream', async () => {
let enqueueCalled = false
let cancelCalled = false

// Create test ReadableStream
const stream = new ReadableStream({
start(controller) {
setTimeout(() => {
try {
controller.enqueue(new TextEncoder().encode('test'))
enqueueCalled = true
} catch {
// Test should fail if error occurs
}
controller.close()
}, 100)
},
cancel() {
cancelCalled = true
},
})

// Test Writable stream
const writable = new Writable()

// Simulate client disconnection after 50ms
setTimeout(() => {
writable.destroy()
}, 50)

await writeFromReadableStream(stream, writable)

expect(enqueueCalled).toBe(true) // enqueue should succeed
expect(cancelCalled).toBe(false) // cancel should not be called
})
})
Loading