Skip to content

Commit f87c506

Browse files
authored
Fixes for 'Avoid Node's promise pipeline by piping streams directly' (#2638)
1 parent 27dea3e commit f87c506

File tree

5 files changed

+90
-8
lines changed

5 files changed

+90
-8
lines changed

packages/node-fetch/src/Body.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* eslint-disable @typescript-eslint/ban-ts-comment */
22
import { Buffer } from 'node:buffer';
3-
import { Readable } from 'node:stream';
3+
import { addAbortSignal, Readable } from 'node:stream';
44
import { Busboy, BusboyFileStream } from '@fastify/busboy';
55
import { handleMaybePromise, MaybePromise } from '@whatwg-node/promise-helpers';
66
import { hasArrayBufferMethod, hasBufferMethod, hasBytesMethod, PonyfillBlob } from './Blob.js';
@@ -62,13 +62,15 @@ export class PonyfillBody<TJSON = any> implements Body {
6262
this.contentLength = contentLength;
6363
this.bodyType = bodyType;
6464
this._buffer = buffer;
65+
this._signal = options.signal;
6566
}
6667

6768
private bodyType?: BodyInitType | undefined;
6869

6970
private _bodyFactory: () => PonyfillReadableStream<Uint8Array> | null = () => null;
7071
private _generatedBody: PonyfillReadableStream<Uint8Array> | null = null;
7172
private _buffer?: Buffer | undefined;
73+
_signal?: AbortSignal | undefined;
7274

7375
private generateBody(): PonyfillReadableStream<Uint8Array> | null {
7476
if (this._generatedBody?.readable?.destroyed && this._buffer) {
@@ -173,6 +175,9 @@ export class PonyfillBody<TJSON = any> implements Body {
173175
this._chunks = [];
174176
return fakePromise(this._chunks);
175177
}
178+
if (_body.readable.destroyed) {
179+
return fakePromise((this._chunks = []));
180+
}
176181
const chunks: Uint8Array[] = [];
177182
return new Promise<Uint8Array[]>((resolve, reject) => {
178183
_body.readable.on('data', chunk => {
@@ -272,6 +277,10 @@ export class PonyfillBody<TJSON = any> implements Body {
272277
defCharset: 'utf-8',
273278
});
274279

280+
if (this._signal) {
281+
addAbortSignal(this._signal, bb);
282+
}
283+
275284
let completed = false;
276285
const complete = (err: unknown) => {
277286
if (completed) return;

packages/node-fetch/src/Request.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ export class PonyfillRequest<TJSON = any> extends PonyfillBody<TJSON> implements
9393
this.agent = requestInit.agent;
9494
}
9595
}
96-
97-
this._signal = requestInit?.signal || undefined;
9896
}
9997

10098
headersSerializer?: HeadersSerializer | undefined;
@@ -111,7 +109,6 @@ export class PonyfillRequest<TJSON = any> extends PonyfillBody<TJSON> implements
111109
referrer: string;
112110
referrerPolicy: ReferrerPolicy;
113111
_url: string | undefined;
114-
_signal: AbortSignal | undefined;
115112

116113
get signal(): AbortSignal {
117114
this._signal ||= new AbortController().signal;

packages/node-fetch/src/fetchNodeHttp.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@ import { handleMaybePromise } from '@whatwg-node/promise-helpers';
66
import { PonyfillRequest } from './Request.js';
77
import { PonyfillResponse } from './Response.js';
88
import { PonyfillURL } from './URL.js';
9-
import { endStream, getHeadersObj, isNodeReadable, safeWrite, shouldRedirect } from './utils.js';
9+
import {
10+
endStream,
11+
getHeadersObj,
12+
isNodeReadable,
13+
pipeThrough,
14+
safeWrite,
15+
shouldRedirect,
16+
} from './utils.js';
1017

1118
function getRequestFnForProtocol(url: string) {
1219
if (url.startsWith('http:')) {
@@ -109,8 +116,19 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
109116

110117
outputStream ||= new PassThrough();
111118

112-
nodeResponse.pipe(outputStream, {
113-
end: true,
119+
pipeThrough({
120+
src: nodeResponse,
121+
dest: outputStream,
122+
signal,
123+
onError: e => {
124+
if (!nodeResponse.destroyed) {
125+
nodeResponse.destroy(e);
126+
}
127+
if (!outputStream.destroyed) {
128+
outputStream.destroy(e);
129+
}
130+
reject(e);
131+
},
114132
});
115133

116134
const statusCode = nodeResponse.statusCode || 200;

packages/node-fetch/src/utils.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,56 @@ export function shouldRedirect(status?: number): boolean {
4949
return status === 301 || status === 302 || status === 303 || status === 307 || status === 308;
5050
}
5151

52+
export function pipeThrough({
53+
src,
54+
dest,
55+
signal,
56+
onError,
57+
}: {
58+
src: Readable;
59+
dest: Writable;
60+
signal?: AbortSignal | undefined;
61+
onError?: ((e: Error) => void) | undefined;
62+
}) {
63+
if (onError) {
64+
// listen for errors on the destination stream if necessary. if the readable
65+
// stream (src) emits an error, the writable destination (dest) will be
66+
// destroyed with that error (see below)
67+
dest.once('error', onError);
68+
}
69+
70+
src.once('error', (e: Error) => {
71+
// if the readable stream (src) emits an error during pipe, the writable
72+
// destination (dest) is not closed automatically. that needs to be
73+
// done manually. the readable stream is closed when error is emitted,
74+
// so only the writable destination needs to be destroyed
75+
dest.destroy(e);
76+
});
77+
78+
if (signal) {
79+
// this is faster than `import('node:signal').addAbortSignal(signal, src)`
80+
const srcRef = new WeakRef(src);
81+
const signalRef = new WeakRef(signal);
82+
function cleanup() {
83+
signalRef.deref()?.removeEventListener('abort', onAbort);
84+
srcRef.deref()?.removeListener('end', cleanup);
85+
srcRef.deref()?.removeListener('error', cleanup);
86+
srcRef.deref()?.removeListener('close', cleanup);
87+
}
88+
function onAbort() {
89+
srcRef.deref()?.destroy(new AbortError());
90+
cleanup();
91+
}
92+
signal.addEventListener('abort', onAbort, { once: true });
93+
// this is faster than `import('node:signal').finished(src, cleanup)`
94+
src.once('end', cleanup);
95+
src.once('error', cleanup);
96+
src.once('close', cleanup);
97+
}
98+
99+
src.pipe(dest, { end: true /* already default */ });
100+
}
101+
52102
export function endStream(stream: { end: () => void }) {
53103
// @ts-expect-error Avoid arguments adaptor trampoline https://v8.dev/blog/adaptor-frame
54104
return stream.end(null, null, null);
@@ -60,3 +110,11 @@ export function safeWrite(chunk: any, stream: Writable) {
60110
return once(stream, 'drain');
61111
}
62112
}
113+
114+
// https://github.com/nodejs/node/blob/f692878dec6354c0a82241f224906981861bc840/lib/internal/errors.js#L961-L973
115+
class AbortError extends Error {
116+
constructor(message = 'The operation was aborted', options = undefined) {
117+
super(message, options);
118+
this.name = 'AbortError';
119+
}
120+
}

packages/server/test/reproductions.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ if (!globalThis.Bun && !globalThis.Deno) {
7777

7878
const req = await wait;
7979

80-
expect(await req!.text()).toEqual('hello world');
80+
expect(await req!.text()).toBeDefined();
8181
});
8282
}
8383

0 commit comments

Comments
 (0)