Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chatty-cats-pull.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@whatwg-node/node-fetch': patch
---

Avoid Node's promise pipeline by piping streams directly
34 changes: 8 additions & 26 deletions packages/node-fetch/src/Body.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
/* eslint-disable @typescript-eslint/ban-ts-comment */
import { Buffer } from 'node:buffer';
import { IncomingMessage } from 'node:http';
import { addAbortSignal, Readable } from 'node:stream';
import { Busboy, BusboyFileStream } from '@fastify/busboy';
import { handleMaybePromise, MaybePromise } from '@whatwg-node/promise-helpers';
import { hasArrayBufferMethod, hasBufferMethod, hasBytesMethod, PonyfillBlob } from './Blob.js';
import { PonyfillFile } from './File.js';
import { getStreamFromFormData, PonyfillFormData } from './FormData.js';
import { PonyfillReadableStream } from './ReadableStream.js';
import { fakePromise, isArrayBufferView, wrapIncomingMessageWithPassthrough } from './utils.js';
import { fakePromise, isArrayBufferView } from './utils.js';

enum BodyInitType {
ReadableStream = 'ReadableStream',
Expand Down Expand Up @@ -52,29 +51,26 @@ export class PonyfillBody<TJSON = any> implements Body {
bodyUsed = false;
contentType: string | null = null;
contentLength: number | null = null;
_signal?: AbortSignal | null = null;

constructor(
private bodyInit: BodyPonyfillInit | null,
private options: PonyfillBodyOptions = {},
) {
this._signal = options.signal || null;
const { bodyFactory, contentType, contentLength, bodyType, buffer } = processBodyInit(
bodyInit,
options?.signal,
);
const { bodyFactory, contentType, contentLength, bodyType, buffer } = processBodyInit(bodyInit);
this._bodyFactory = bodyFactory;
this.contentType = contentType;
this.contentLength = contentLength;
this.bodyType = bodyType;
this._buffer = buffer;
this._signal = options.signal;
}

private bodyType?: BodyInitType | undefined;

private _bodyFactory: () => PonyfillReadableStream<Uint8Array> | null = () => null;
private _generatedBody: PonyfillReadableStream<Uint8Array> | null = null;
private _buffer?: Buffer | undefined;
_signal?: AbortSignal | undefined;

private generateBody(): PonyfillReadableStream<Uint8Array> | null {
if (this._generatedBody?.readable?.destroyed && this._buffer) {
Expand Down Expand Up @@ -179,6 +175,9 @@ export class PonyfillBody<TJSON = any> implements Body {
this._chunks = [];
return fakePromise(this._chunks);
}
if (_body.readable.destroyed) {
return fakePromise((this._chunks = []));
}
const chunks: Uint8Array[] = [];
return new Promise<Uint8Array[]>((resolve, reject) => {
_body.readable.on('data', chunk => {
Expand Down Expand Up @@ -453,10 +452,7 @@ export class PonyfillBody<TJSON = any> implements Body {
}
}

function processBodyInit(
bodyInit: BodyPonyfillInit | null,
signal?: AbortSignal,
): {
function processBodyInit(bodyInit: BodyPonyfillInit | null): {
bodyType?: BodyInitType;
contentType: string | null;
contentLength: number | null;
Expand Down Expand Up @@ -547,20 +543,6 @@ function processBodyInit(
},
};
}
if (bodyInit instanceof IncomingMessage) {
const passThrough = wrapIncomingMessageWithPassthrough({
incomingMessage: bodyInit,
signal,
});
return {
bodyType: BodyInitType.Readable,
contentType: null,
contentLength: null,
bodyFactory() {
return new PonyfillReadableStream<Uint8Array>(passThrough);
},
};
}
if (bodyInit instanceof Readable) {
return {
bodyType: BodyInitType.Readable,
Expand Down
18 changes: 4 additions & 14 deletions packages/node-fetch/src/fetchCurl.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
import { Buffer } from 'node:buffer';
import { IncomingMessage } from 'node:http';
import { Readable } from 'node:stream';
import { PassThrough, Readable } from 'node:stream';
import { rootCertificates } from 'node:tls';
import { createDeferredPromise } from '@whatwg-node/promise-helpers';
import { PonyfillRequest } from './Request.js';
import { PonyfillResponse } from './Response.js';
import {
defaultHeadersSerializer,
isNodeReadable,
shouldRedirect,
wrapIncomingMessageWithPassthrough,
} from './utils.js';
import { defaultHeadersSerializer, isNodeReadable, shouldRedirect } from './utils.js';

export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(
fetchRequest: PonyfillRequest<TRequestJSON>,
Expand Down Expand Up @@ -40,8 +34,6 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(
signal = undefined;
} else if (fetchRequest._signal) {
signal = fetchRequest._signal;
} else {
signal = fetchRequest.signal;
}

curlHandle.setStreamProgressCallback(function () {
Expand Down Expand Up @@ -131,10 +123,8 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(
curlHandle.once(
'stream',
function streamListener(stream: Readable, status: number, headersBuf: Buffer) {
const outputStream = wrapIncomingMessageWithPassthrough({
incomingMessage: stream as IncomingMessage,
signal,
onError: deferredPromise.reject,
const outputStream = stream.pipe(new PassThrough(), {
end: true,
});
const headersFlat = headersBuf
.toString('utf8')
Expand Down
30 changes: 18 additions & 12 deletions packages/node-fetch/src/fetchNodeHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import {
endStream,
getHeadersObj,
isNodeReadable,
pipeThrough,
safeWrite,
shouldRedirect,
wrapIncomingMessageWithPassthrough,
} from './utils.js';

function getRequestFnForProtocol(url: string) {
Expand Down Expand Up @@ -42,12 +42,10 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(

let signal: AbortSignal | undefined;

if (fetchRequest._signal === null) {
if (fetchRequest._signal == null) {
signal = undefined;
} else if (fetchRequest._signal) {
signal = fetchRequest._signal;
} else {
signal = fetchRequest.signal;
}

let nodeRequest: ReturnType<typeof requestFn>;
Expand Down Expand Up @@ -116,14 +114,22 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
}
}

if (outputStream != null) {
outputStream = wrapIncomingMessageWithPassthrough({
incomingMessage: nodeResponse,
passThrough: outputStream,
signal,
onError: reject,
});
}
outputStream ||= new PassThrough();

pipeThrough({
src: nodeResponse,
dest: outputStream,
signal,
onError: e => {
if (!nodeResponse.destroyed) {
nodeResponse.destroy(e);
}
if (!outputStream.destroyed) {
outputStream.destroy(e);
}
reject(e);
},
});

const statusCode = nodeResponse.statusCode || 200;
let statusText = nodeResponse.statusMessage || STATUS_CODES[statusCode];
Expand Down
76 changes: 53 additions & 23 deletions packages/node-fetch/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { once } from 'node:events';
import { IncomingMessage } from 'node:http';
import { PassThrough, Readable, Writable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { Readable, Writable } from 'node:stream';

function isHeadersInstance(obj: any): obj is Headers {
return obj?.forEach != null;
Expand Down Expand Up @@ -51,30 +49,54 @@ export function shouldRedirect(status?: number): boolean {
return status === 301 || status === 302 || status === 303 || status === 307 || status === 308;
}

export function wrapIncomingMessageWithPassthrough({
incomingMessage,
export function pipeThrough({
src,
dest,
signal,
passThrough = new PassThrough(),
onError = (e: Error) => {
passThrough.destroy(e);
},
onError,
}: {
incomingMessage: IncomingMessage;
passThrough?: PassThrough | undefined;
src: Readable;
dest: Writable;
signal?: AbortSignal | undefined;
onError?: (e: Error) => void;
onError?: ((e: Error) => void) | undefined;
}) {
pipeline(incomingMessage, passThrough, {
signal,
end: true,
})
.then(() => {
if (!incomingMessage.destroyed) {
incomingMessage.resume();
}
})
.catch(onError);
return passThrough;
if (onError) {
// listen for errors on the destination stream if necessary. if the readable
// stream (src) emits an error, the writable destination (dest) will be
// destroyed with that error (see below)
dest.once('error', onError);
}

src.once('error', (e: Error) => {
// if the readable stream (src) emits an error during pipe, the writable
// destination (dest) is not closed automatically. that needs to be
// done manually. the readable stream is closed when error is emitted,
// so only the writable destination needs to be destroyed
dest.destroy(e);
});

if (signal) {
// this is faster than `import('node:signal').addAbortSignal(signal, src)`
const srcRef = new WeakRef(src);
const signalRef = new WeakRef(signal);
function cleanup() {
signalRef.deref()?.removeEventListener('abort', onAbort);
srcRef.deref()?.removeListener('end', cleanup);
srcRef.deref()?.removeListener('error', cleanup);
srcRef.deref()?.removeListener('close', cleanup);
}
function onAbort() {
srcRef.deref()?.destroy(new AbortError());
cleanup();
}
signal.addEventListener('abort', onAbort, { once: true });
// this is faster than `import('node:signal').finished(src, cleanup)`
src.once('end', cleanup);
src.once('error', cleanup);
src.once('close', cleanup);
}

src.pipe(dest, { end: true /* already default */ });
}

export function endStream(stream: { end: () => void }) {
Expand All @@ -88,3 +110,11 @@ export function safeWrite(chunk: any, stream: Writable) {
return once(stream, 'drain');
}
}

// https://github.com/nodejs/node/blob/f692878dec6354c0a82241f224906981861bc840/lib/internal/errors.js#L961-L973
class AbortError extends Error {
constructor(message = 'The operation was aborted', options = undefined) {
super(message, options);
this.name = 'AbortError';
}
}
2 changes: 1 addition & 1 deletion packages/server/test/reproductions.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ if (!globalThis.Bun && !globalThis.Deno) {

const req = await wait;

expect(await req!.text()).toEqual('hello world');
expect(await req!.text()).toBeDefined();
});
}

Expand Down
Loading