Skip to content

Commit dce9344

Browse files
committed
streams: enable usage of webstreams on compose()
Refs: #39316
1 parent f46515c commit dce9344

File tree

2 files changed

+117
-31
lines changed

2 files changed

+117
-31
lines changed

lib/internal/streams/compose.js

Lines changed: 116 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ const {
77
isNodeStream,
88
isReadable,
99
isWritable,
10+
isWebStream,
11+
isTransformStream,
12+
isWritableStream,
13+
isReadableStream,
1014
} = require('internal/streams/utils');
1115
const {
1216
AbortError,
@@ -15,6 +19,7 @@ const {
1519
ERR_MISSING_ARGS,
1620
},
1721
} = require('internal/errors');
22+
const eos = require('internal/streams/end-of-stream');
1823

1924
module.exports = function compose(...streams) {
2025
if (streams.length === 0) {
@@ -57,9 +62,8 @@ module.exports = function compose(...streams) {
5762
}
5863
}
5964

60-
let ondrain;
61-
let onfinish;
62-
let onreadable;
65+
let writableEndDestructor;
66+
let readableEndDestructor;
6367
let onclose;
6468
let d;
6569

@@ -79,8 +83,8 @@ module.exports = function compose(...streams) {
7983
const head = streams[0];
8084
const tail = pipeline(streams, onfinished);
8185

82-
const writable = !!isWritable(head);
83-
const readable = !!isReadable(tail);
86+
const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head));
87+
const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail));
8488

8589
// TODO(ronag): Avoid double buffering.
8690
// Implement Writable/Readable/Duplex traits.
@@ -94,15 +98,51 @@ module.exports = function compose(...streams) {
9498
});
9599

96100
if (writable) {
97-
d._write = function(chunk, encoding, callback) {
101+
writableEndDestructor = makeWritableEnd(d, head, tail);
102+
}
103+
104+
if (readable) {
105+
readableEndDestructor = makeReadableEnd(d, head, tail);
106+
}
107+
108+
d._destroy = function(err, callback) {
109+
if (!err && onclose !== null) {
110+
err = new AbortError();
111+
}
112+
113+
if (readableEndDestructor) {
114+
readableEndDestructor();
115+
}
116+
117+
if (writableEndDestructor) {
118+
writableEndDestructor();
119+
}
120+
121+
if (onclose === null) {
122+
callback(err);
123+
} else {
124+
onclose = callback;
125+
destroyer(tail, err);
126+
}
127+
};
128+
129+
return d;
130+
};
131+
132+
function makeWritableEnd(duplex, head, tail) {
133+
let ondrain;
134+
let onfinish;
135+
136+
if (isNodeStream(head)) {
137+
duplex._write = function(chunk, encoding, callback) {
98138
if (head.write(chunk, encoding)) {
99139
callback();
100140
} else {
101141
ondrain = callback;
102142
}
103143
};
104144

105-
d._final = function(callback) {
145+
duplex._final = function(callback) {
106146
head.end();
107147
onfinish = callback;
108148
};
@@ -114,17 +154,61 @@ module.exports = function compose(...streams) {
114154
cb();
115155
}
116156
});
157+
} else if (isWebStream(head)) {
158+
const writable = isTransformStream(head) ? head.writable : head;
159+
const writer = writable.getWriter();
160+
161+
duplex._write = async function(chunk, encoding, callback) {
162+
try {
163+
await writer.ready;
164+
writer.write(chunk).catch(() => {});
165+
callback();
166+
} catch (err) {
167+
callback(err);
168+
}
169+
};
117170

171+
duplex._final = async function(callback) {
172+
try {
173+
await writer.ready;
174+
writer.close();
175+
onfinish = callback;
176+
} catch (err) {
177+
callback(err);
178+
}
179+
};
180+
}
181+
182+
if (isNodeStream(tail)) {
118183
tail.on('finish', function() {
119184
if (onfinish) {
120185
const cb = onfinish;
121186
onfinish = null;
122187
cb();
123188
}
124189
});
190+
} else if (isWebStream(tail)) {
191+
const readable = isTransformStream(tail) ? tail.readable : tail;
192+
eos(readable, () => {
193+
if (onfinish) {
194+
const cb = onfinish;
195+
onfinish = null;
196+
cb();
197+
}
198+
});
125199
}
126200

127-
if (readable) {
201+
function destructor() {
202+
ondrain = null;
203+
onfinish = null;
204+
}
205+
206+
return destructor;
207+
}
208+
209+
function makeReadableEnd(duplex, head, tail) {
210+
let onreadable;
211+
if (isNodeStream(tail)) {
128212
tail.on('readable', function() {
129213
if (onreadable) {
130214
const cb = onreadable;
@@ -134,41 +218,43 @@ module.exports = function compose(...streams) {
134218
});
135219

136220
tail.on('end', function() {
137-
d.push(null);
221+
duplex.push(null);
138222
});
139223

140-
d._read = function() {
224+
duplex._read = function() {
141225
while (true) {
142226
const buf = tail.read();
143-
144227
if (buf === null) {
145-
onreadable = d._read;
228+
onreadable = duplex._read;
146229
return;
147230
}
148231

149-
if (!d.push(buf)) {
232+
if (!duplex.push(buf)) {
150233
return;
151234
}
152235
}
153236
};
154-
}
237+
} else if (isWebStream(tail)) {
238+
const readable = isTransformStream(tail) ? tail.readable : tail;
239+
const reader = readable.getReader();
240+
duplex._read = async function() {
241+
while (true) {
242+
const { value, done } = await reader.read();
243+
if (done) {
244+
duplex.push(null);
245+
return;
246+
}
155247

156-
d._destroy = function(err, callback) {
157-
if (!err && onclose !== null) {
158-
err = new AbortError();
159-
}
248+
if (!duplex.push(value)) {
249+
return;
250+
}
251+
}
252+
};
253+
}
160254

255+
function destructor() {
161256
onreadable = null;
162-
ondrain = null;
163-
onfinish = null;
164-
165-
if (onclose === null) {
166-
callback(err);
167-
} else {
168-
onclose = callback;
169-
destroyer(tail, err);
170-
}
171-
};
257+
}
172258

173-
return d;
174-
};
259+
return destructor;
260+
}

lib/internal/streams/pipeline.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ function pipelineImpl(streams, callback, opts) {
285285
throw new ERR_INVALID_RETURN_VALUE(
286286
'Iterable, AsyncIterable or Stream', 'source', ret);
287287
}
288-
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
288+
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
289289
ret = stream;
290290
} else {
291291
ret = Duplex.from(stream);

0 commit comments

Comments
 (0)