Skip to content

Commit ff052a3

Browse files
authored
fix(server): do not hang when server adapter's request is leaked and attempted to parse outside (#2208)
1 parent 9d027c5 commit ff052a3

File tree

15 files changed

+446
-361
lines changed

15 files changed

+446
-361
lines changed

.changeset/heavy-worms-battle.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
'@whatwg-node/node-fetch': patch
3+
'@whatwg-node/server': patch
4+
---
5+
6+
When any `Request` method is called outside server adapter scope, it used to hang.
7+
This PR prevents it to hang and throw an error if the readable stream is destroyed earlier.
8+
9+
```ts
10+
let request: Request;
11+
const adapter = createServerAdapter(req => {
12+
request = req;
13+
return new Response('Hello World');
14+
});
15+
16+
await request.text(); // Was hanging but now throws an error
17+
```

packages/node-fetch/src/Body.ts

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/* eslint-disable @typescript-eslint/ban-ts-comment */
22
import { Buffer } from 'node:buffer';
3-
import { Readable } from 'node:stream';
3+
import { IncomingMessage } from 'node:http';
4+
import { PassThrough, Readable } from 'node:stream';
45
import busboy from 'busboy';
56
import { handleMaybePromise, MaybePromise } from '@whatwg-node/promise-helpers';
67
import { hasArrayBufferMethod, hasBufferMethod, hasBytesMethod, PonyfillBlob } from './Blob.js';
@@ -44,18 +45,24 @@ export interface FormDataLimits {
4445

4546
export interface PonyfillBodyOptions {
4647
formDataLimits?: FormDataLimits;
48+
signal?: AbortSignal;
4749
}
4850

4951
export class PonyfillBody<TJSON = any> implements Body {
5052
bodyUsed = false;
5153
contentType: string | null = null;
5254
contentLength: number | null = null;
55+
signal?: AbortSignal | null = null;
5356

5457
constructor(
5558
private bodyInit: BodyPonyfillInit | null,
5659
private options: PonyfillBodyOptions = {},
5760
) {
58-
const { bodyFactory, contentType, contentLength, bodyType, buffer } = processBodyInit(bodyInit);
61+
this.signal = options.signal || null;
62+
const { bodyFactory, contentType, contentLength, bodyType, buffer } = processBodyInit(
63+
bodyInit,
64+
options?.signal,
65+
);
5966
this._bodyFactory = bodyFactory;
6067
this.contentType = contentType;
6168
this.contentLength = contentLength;
@@ -77,6 +84,7 @@ export class PonyfillBody<TJSON = any> implements Body {
7784
return this._generatedBody;
7885
}
7986
const body = this._bodyFactory();
87+
8088
this._generatedBody = body;
8189
return body;
8290
}
@@ -134,12 +142,9 @@ export class PonyfillBody<TJSON = any> implements Body {
134142
return null;
135143
}
136144

137-
_chunks: Uint8Array[] | null = null;
145+
_chunks: MaybePromise<Uint8Array[]> | null = null;
138146

139-
_collectChunksFromReadable() {
140-
if (this._chunks) {
141-
return fakePromise(this._chunks);
142-
}
147+
_doCollectChunksFromReadableJob() {
143148
if (this.bodyType === BodyInitType.AsyncIterable) {
144149
if (Array.fromAsync) {
145150
return handleMaybePromise(
@@ -151,17 +156,18 @@ export class PonyfillBody<TJSON = any> implements Body {
151156
);
152157
}
153158
const iterator = (this.bodyInit as AsyncIterable<Uint8Array>)[Symbol.asyncIterator]();
159+
const chunks: Uint8Array[] = [];
154160
const collectValue = (): MaybePromise<Uint8Array[]> =>
155161
handleMaybePromise(
156162
() => iterator.next(),
157163
({ value, done }) => {
158-
this._chunks ||= [];
159164
if (value) {
160-
this._chunks.push(value);
165+
chunks.push(value);
161166
}
162167
if (!done) {
163168
return collectValue();
164169
}
170+
this._chunks = chunks;
165171
return this._chunks;
166172
},
167173
);
@@ -172,16 +178,20 @@ export class PonyfillBody<TJSON = any> implements Body {
172178
this._chunks = [];
173179
return fakePromise(this._chunks);
174180
}
175-
this._chunks = [];
176-
_body.readable.on('data', chunk => {
177-
this._chunks!.push(chunk);
178-
});
179-
return new Promise<Uint8Array[]>((resolve, reject) => {
180-
_body.readable.once('end', () => resolve(this._chunks!));
181-
_body.readable.once('error', reject);
181+
return _body.readable.toArray().then(chunks => {
182+
this._chunks = chunks;
183+
return this._chunks;
182184
});
183185
}
184186

187+
_collectChunksFromReadable() {
188+
if (this._chunks) {
189+
return fakePromise(this._chunks);
190+
}
191+
this._chunks ||= this._doCollectChunksFromReadableJob();
192+
return this._chunks;
193+
}
194+
185195
_blob: PonyfillBlob | null = null;
186196

187197
blob(): Promise<PonyfillBlob> {
@@ -373,7 +383,10 @@ export class PonyfillBody<TJSON = any> implements Body {
373383
}
374384
}
375385

376-
function processBodyInit(bodyInit: BodyPonyfillInit | null): {
386+
function processBodyInit(
387+
bodyInit: BodyPonyfillInit | null,
388+
signal?: AbortSignal,
389+
): {
377390
bodyType?: BodyInitType;
378391
contentType: string | null;
379392
contentLength: number | null;
@@ -402,13 +415,14 @@ function processBodyInit(bodyInit: BodyPonyfillInit | null): {
402415
};
403416
}
404417
if (Buffer.isBuffer(bodyInit)) {
418+
const buffer: Buffer = bodyInit;
405419
return {
406420
bodyType: BodyInitType.Buffer,
407421
contentType: null,
408422
contentLength: bodyInit.length,
409423
buffer: bodyInit,
410424
bodyFactory() {
411-
const readable = Readable.from(bodyInit);
425+
const readable = Readable.from(buffer);
412426
const body = new PonyfillReadableStream<Uint8Array>(readable);
413427
return body;
414428
},
@@ -429,20 +443,22 @@ function processBodyInit(bodyInit: BodyPonyfillInit | null): {
429443
};
430444
}
431445
if (bodyInit instanceof PonyfillReadableStream && bodyInit.readable != null) {
446+
const readableStream: PonyfillReadableStream<Uint8Array> = bodyInit;
432447
return {
433448
bodyType: BodyInitType.ReadableStream,
434-
bodyFactory: () => bodyInit,
449+
bodyFactory: () => readableStream,
435450
contentType: null,
436451
contentLength: null,
437452
};
438453
}
439454
if (isBlob(bodyInit)) {
455+
const blob = bodyInit as PonyfillBlob;
440456
return {
441457
bodyType: BodyInitType.Blob,
442458
contentType: bodyInit.type,
443459
contentLength: bodyInit.size,
444460
bodyFactory() {
445-
return bodyInit.stream() as PonyfillReadableStream<Uint8Array>;
461+
return blob.stream();
446462
},
447463
};
448464
}
@@ -461,6 +477,25 @@ function processBodyInit(bodyInit: BodyPonyfillInit | null): {
461477
},
462478
};
463479
}
480+
if (bodyInit instanceof IncomingMessage) {
481+
const passThrough = bodyInit.pipe(
482+
new PassThrough({
483+
signal,
484+
}),
485+
{
486+
end: true,
487+
},
488+
);
489+
490+
return {
491+
bodyType: BodyInitType.Readable,
492+
contentType: null,
493+
contentLength: null,
494+
bodyFactory() {
495+
return new PonyfillReadableStream<Uint8Array>(passThrough);
496+
},
497+
};
498+
}
464499
if (bodyInit instanceof Readable) {
465500
return {
466501
bodyType: BodyInitType.Readable,

packages/node-fetch/src/ReadableStream.ts

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Buffer } from 'node:buffer';
22
import { Readable } from 'node:stream';
3+
import { handleMaybePromise } from '@whatwg-node/promise-helpers';
34
import { fakePromise } from './utils.js';
45
import { PonyfillWritableStream } from './WritableStream.js';
56

@@ -73,20 +74,40 @@ export class PonyfillReadableStream<T> implements ReadableStream<T> {
7374
} else {
7475
let started = false;
7576
let ongoing = false;
76-
const readImpl = async (desiredSize: number) => {
77+
const handleStart = (desiredSize: number) => {
7778
if (!started) {
7879
const controller = createController(desiredSize, this.readable);
7980
started = true;
80-
await underlyingSource?.start?.(controller);
81-
controller._flush();
82-
if (controller._closed) {
83-
return;
84-
}
81+
return handleMaybePromise(
82+
() => underlyingSource?.start?.(controller),
83+
() => {
84+
controller._flush();
85+
if (controller._closed) {
86+
return false;
87+
}
88+
return true;
89+
},
90+
);
8591
}
86-
const controller = createController(desiredSize, this.readable);
87-
await underlyingSource?.pull?.(controller);
88-
controller._flush();
89-
ongoing = false;
92+
return true;
93+
};
94+
const readImpl = (desiredSize: number) => {
95+
return handleMaybePromise(
96+
() => handleStart(desiredSize),
97+
shouldContinue => {
98+
if (!shouldContinue) {
99+
return;
100+
}
101+
const controller = createController(desiredSize, this.readable);
102+
return handleMaybePromise(
103+
() => underlyingSource?.pull?.(controller),
104+
() => {
105+
controller._flush();
106+
ongoing = false;
107+
},
108+
);
109+
},
110+
);
90111
};
91112
this.readable = new Readable({
92113
read(desiredSize) {
@@ -243,6 +264,8 @@ export class PonyfillReadableStream<T> implements ReadableStream<T> {
243264
static from<T>(iterable: AsyncIterable<T> | Iterable<T>): PonyfillReadableStream<T> {
244265
return new PonyfillReadableStream(Readable.from(iterable));
245266
}
267+
268+
[Symbol.toStringTag] = 'ReadableStream';
246269
}
247270

248271
function isPonyfillReadableStream(obj: any): obj is PonyfillReadableStream<any> {

packages/node-fetch/src/Request.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ export class PonyfillRequest<TJSON = any> extends PonyfillBody<TJSON> implements
5454
requestInit = options;
5555
}
5656

57-
super(bodyInit, options);
57+
super(bodyInit, requestInit);
5858

5959
this._url = _url;
6060
this._parsedUrl = _parsedUrl;

packages/node-fetch/src/Response.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export class PonyfillResponse<TJSON = any> extends PonyfillBody<TJSON> implement
2626
this.url = init?.url || '';
2727
this.redirected = init?.redirected || false;
2828
this.type = init?.type || 'default';
29+
this.signal = init?.signal || null;
2930

3031
this.handleContentLengthHeader();
3132
}
@@ -74,4 +75,6 @@ export class PonyfillResponse<TJSON = any> extends PonyfillBody<TJSON> implement
7475
}
7576
return new PonyfillResponse<T>(JSON.stringify(data), init);
7677
}
78+
79+
[Symbol.toStringTag] = 'Response';
7780
}

packages/node-fetch/src/fetchNodeHttp.ts

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
2626
fetchRequest.parsedUrl?.protocol || fetchRequest.url,
2727
);
2828

29-
const nodeReadable = (
30-
fetchRequest.body != null
31-
? isNodeReadable(fetchRequest.body)
32-
? fetchRequest.body
33-
: Readable.from(fetchRequest.body)
34-
: null
35-
) as Readable | null;
3629
const headersSerializer: typeof getHeadersObj =
3730
(fetchRequest.headersSerializer as any) || getHeadersObj;
3831
const nodeHeaders = headersSerializer(fetchRequest.headers);
@@ -60,7 +53,7 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
6053
}
6154

6255
nodeRequest.once('response', nodeResponse => {
63-
let outputStream: PassThrough;
56+
let outputStream: PassThrough | undefined;
6457
const contentEncoding = nodeResponse.headers['content-encoding'];
6558
switch (contentEncoding) {
6659
case 'x-gzip':
@@ -78,8 +71,6 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
7871
case 'br':
7972
outputStream = createBrotliDecompress();
8073
break;
81-
default:
82-
outputStream = new PassThrough();
8374
}
8475
if (nodeResponse.headers.location && shouldRedirect(nodeResponse.statusCode)) {
8576
if (fetchRequest.redirect === 'error') {
@@ -106,36 +97,53 @@ export function fetchNodeHttp<TResponseJSON = any, TRequestJSON = any>(
10697
return;
10798
}
10899
}
109-
pipeline(nodeResponse, outputStream, {
110-
signal: fetchRequest.signal,
111-
end: true,
112-
})
113-
.then(() => {
114-
if (!nodeResponse.destroyed) {
115-
nodeResponse.resume();
116-
}
100+
if (outputStream != null) {
101+
pipeline(nodeResponse, outputStream, {
102+
signal: fetchRequest.signal,
103+
end: true,
117104
})
118-
.catch(reject);
105+
.then(() => {
106+
if (!nodeResponse.destroyed) {
107+
nodeResponse.resume();
108+
}
109+
})
110+
.catch(reject);
111+
}
119112

120113
const statusCode = nodeResponse.statusCode || 200;
121114
let statusText = nodeResponse.statusMessage || STATUS_CODES[statusCode];
122115
if (statusText == null) {
123116
statusText = '';
124117
}
125-
const ponyfillResponse = new PonyfillResponse(outputStream, {
118+
const ponyfillResponse = new PonyfillResponse(outputStream || nodeResponse, {
126119
status: statusCode,
127120
statusText,
128121
headers: nodeResponse.headers as Record<string, string>,
129122
url: fetchRequest.url,
123+
signal: fetchRequest.signal,
130124
});
131125
resolve(ponyfillResponse);
132126
});
133127
nodeRequest.once('error', reject);
134128

135-
if (nodeReadable) {
136-
nodeReadable.pipe(nodeRequest);
129+
if (fetchRequest['_buffer'] != null) {
130+
nodeRequest.write(fetchRequest['_buffer']);
131+
// @ts-expect-error Avoid arguments adaptor trampoline https://v8.dev/blog/adaptor-frame
132+
nodeRequest.end(null, null, null);
137133
} else {
138-
nodeRequest.end();
134+
const nodeReadable = (
135+
fetchRequest.body != null
136+
? isNodeReadable(fetchRequest.body)
137+
? fetchRequest.body
138+
: Readable.from(fetchRequest.body)
139+
: null
140+
) as Readable | null;
141+
if (nodeReadable) {
142+
nodeReadable.pipe(nodeRequest);
143+
} else {
144+
// @ts-expect-error Avoid arguments adaptor trampoline https://v8.dev/blog/adaptor-frame
145+
nodeRequest.end(null, null, null);
146+
}
139147
}
140148
} catch (e) {
141149
reject(e);

0 commit comments

Comments
 (0)