Skip to content

Commit 0411240

Browse files
committed
Implement buffering in pipeable streams
The previous implementation of pipeable streaming (Node) suffered some performance issues brought about by the high chunk counts and innefficiencies with how node streams handle this situation. In particular the use of cork/uncork was meant to alleviate this but these methods do not do anything unless the receiving Writable Stream implements _writev which many won't. This change adopts the view based buffering techniques previously implemented for the Browser execution context. The main difference is the use of backpressure provided by the writable stream which is not implementable in the other context. Another change to note is the use of standards constructs like TextEncoder and TypedArrays.
1 parent fc47cb1 commit 0411240

File tree

1 file changed

+72
-16
lines changed

1 file changed

+72
-16
lines changed

packages/react-server/src/ReactServerStreamConfigNode.js

Lines changed: 72 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
*/
99

1010
import type {Writable} from 'stream';
11+
import {TextEncoder} from 'util';
1112

1213
type MightBeFlushable = {
1314
flush?: () => void,
@@ -17,7 +18,7 @@ type MightBeFlushable = {
1718
export type Destination = Writable & MightBeFlushable;
1819

1920
export type PrecomputedChunk = Uint8Array;
20-
export type Chunk = string;
21+
export type Chunk = Uint8Array;
2122

2223
export function scheduleWork(callback: () => void) {
2324
setImmediate(callback);
@@ -33,46 +34,101 @@ export function flushBuffered(destination: Destination) {
3334
}
3435
}
3536

37+
const VIEW_SIZE = 2048;
38+
let currentView = null;
39+
let writtenBytes = 0;
40+
let destinationHasCapacity = true;
41+
3642
export function beginWriting(destination: Destination) {
37-
// Older Node streams like http.createServer don't have this.
38-
if (typeof destination.cork === 'function') {
39-
destination.cork();
40-
}
43+
currentView = new Uint8Array(VIEW_SIZE);
44+
writtenBytes = 0;
45+
destinationHasCapacity = true;
4146
}
4247

4348
export function writeChunk(
4449
destination: Destination,
45-
chunk: Chunk | PrecomputedChunk,
50+
chunk: PrecomputedChunk | Chunk,
4651
): void {
47-
const nodeBuffer = ((chunk: any): Buffer | string); // close enough
48-
destination.write(nodeBuffer);
52+
if (chunk.byteLength === 0) {
53+
return;
54+
}
55+
56+
if (chunk.byteLength > VIEW_SIZE) {
57+
// this chunk may overflow a single view which implies it was not
58+
// one that is cached by the streaming renderer. We will enqueu
59+
// it directly and expect it is not re-used
60+
if (writtenBytes > 0) {
61+
writeToDestination(
62+
destination,
63+
((currentView: any): Uint8Array).subarray(0, writtenBytes),
64+
);
65+
currentView = new Uint8Array(VIEW_SIZE);
66+
writtenBytes = 0;
67+
}
68+
writeToDestination(destination, chunk);
69+
return;
70+
}
71+
72+
let bytesToWrite = chunk;
73+
const allowableBytes = ((currentView: any): Uint8Array).length - writtenBytes;
74+
if (allowableBytes < bytesToWrite.byteLength) {
75+
// this chunk would overflow the current view. We enqueue a full view
76+
// and start a new view with the remaining chunk
77+
if (allowableBytes === 0) {
78+
// the current view is already full, send it
79+
writeToDestination(destination, (currentView: any));
80+
} else {
81+
// fill up the current view and apply the remaining chunk bytes
82+
// to a new view.
83+
((currentView: any): Uint8Array).set(
84+
bytesToWrite.subarray(0, allowableBytes),
85+
writtenBytes,
86+
);
87+
writtenBytes += allowableBytes;
88+
writeToDestination(destination, (currentView: any));
89+
bytesToWrite = bytesToWrite.subarray(allowableBytes);
90+
}
91+
currentView = new Uint8Array(VIEW_SIZE);
92+
writtenBytes = 0;
93+
}
94+
((currentView: any): Uint8Array).set(bytesToWrite, writtenBytes);
95+
writtenBytes += bytesToWrite.byteLength;
96+
}
97+
98+
function writeToDestination(destination: Destination, view: Uint8Array) {
99+
const currentHasCapacity = destination.write(view);
100+
destinationHasCapacity = destinationHasCapacity && currentHasCapacity;
49101
}
50102

51103
export function writeChunkAndReturn(
52104
destination: Destination,
53-
chunk: Chunk | PrecomputedChunk,
105+
chunk: PrecomputedChunk | Chunk,
54106
): boolean {
55-
const nodeBuffer = ((chunk: any): Buffer | string); // close enough
56-
return destination.write(nodeBuffer);
107+
writeChunk(destination, chunk);
108+
return destinationHasCapacity;
57109
}
58110

59111
export function completeWriting(destination: Destination) {
60-
// Older Node streams like http.createServer don't have this.
61-
if (typeof destination.uncork === 'function') {
62-
destination.uncork();
112+
if (currentView && writtenBytes > 0) {
113+
destination.write(currentView.subarray(0, writtenBytes));
63114
}
115+
currentView = null;
116+
writtenBytes = 0;
117+
destinationHasCapacity = true;
64118
}
65119

66120
export function close(destination: Destination) {
67121
destination.end();
68122
}
69123

124+
const textEncoder = new TextEncoder();
125+
70126
export function stringToChunk(content: string): Chunk {
71-
return content;
127+
return textEncoder.encode(content);
72128
}
73129

74130
export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
75-
return Buffer.from(content, 'utf8');
131+
return textEncoder.encode(content);
76132
}
77133

78134
export function closeWithError(destination: Destination, error: mixed): void {

0 commit comments

Comments
 (0)