Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,41 @@ option. In the code example above, data will be in a single chunk if the file
has less then 64 KiB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].

##### `readable.compose(stream[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `stream` {Stream|Iterable|AsyncIterable|Function}
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Duplex} a stream composed with the stream `stream`.

```mjs
import { Readable } from 'node:stream';

async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ');

for (const word of words) {
yield word;
}
}
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();

console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
```

See [`stream.compose`][] for more information.

##### `readable.iterator([options])`

<!-- YAML
Expand Down Expand Up @@ -2715,6 +2750,8 @@ await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
```

See [`readable.compose(stream)`][] for `stream.compose` as operator.

### `stream.Readable.from(iterable[, options])`

<!-- YAML
Expand Down Expand Up @@ -4482,11 +4519,13 @@ contain multi-byte characters.
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
[`readable._read()`]: #readable_readsize
[`readable.compose(stream)`]: #readablecomposestream-options
[`readable.map`]: #readablemapfn-options
[`readable.push('')`]: #readablepush
[`readable.setEncoding()`]: #readablesetencodingencoding
[`stream.Readable.from()`]: #streamreadablefromiterable-options
[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream
[`stream.compose`]: #streamcomposestreams
[`stream.cork()`]: #writablecork
[`stream.finished()`]: #streamfinishedstream-options-callback
[`stream.pipe()`]: #readablepipedestination-options
Expand Down
32 changes: 32 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { AbortController } = require('internal/abort_controller');

const {
codes: {
ERR_INVALID_ARG_VALUE,
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
ERR_OUT_OF_RANGE,
Expand All @@ -17,6 +18,11 @@ const {
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');
const staticCompose = require('internal/streams/compose');
const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');
const { isWritable, isNodeStream } = require('internal/streams/utils');

const {
ArrayPrototypePush,
Expand All @@ -32,6 +38,31 @@ const {
const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');

function compose(stream, options) {
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

if (isNodeStream(stream) && !isWritable(stream)) {
throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable');
}

const composedStream = staticCompose(this, stream);

if (options?.signal) {
// Not validating as we already validated before
addAbortSignalNoValidate(
options.signal,
composedStream
);
}

return composedStream;
}

function map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -392,6 +423,7 @@ module.exports.streamReturningOperators = {
flatMap,
map,
take,
compose,
};

module.exports.promiseReturningOperators = {
Expand Down
127 changes: 127 additions & 0 deletions test/parallel/test-stream-compose-operator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
'use strict';

const common = require('../common');
const {
Readable, Transform,
} = require('stream');
const assert = require('assert');

{
// with async generator
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) {
let str = '';
for await (const chunk of stream) {
str += chunk;

if (str.length === 2) {
yield str;
str = '';
}
}
});
const result = ['ab', 'cd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// With Transformer
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk);
}, 4)
}));
const result = ['a', 'b', 'c', 'd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Throwing an error during `compose` (before waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield

throw new Error('boom');
});

assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, /boom/).then(common.mustCall());
}

{
// Throwing an error during `compose` (when waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) {
for await (const chunk of stream) {
if (chunk === 3) {
throw new Error('boom');
}
yield chunk;
}
});

assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}

{
// Throwing an error during `compose` (after finishing all readable data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield

// eslint-disable-next-line no-unused-vars,no-empty
for await (const chunk of stream) {
}

throw new Error('boom');
});
assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}

{
// AbortSignal
const ac = new AbortController();
const stream = Readable.from([1, 2, 3, 4, 5])
.compose(async function *(source) {
// Should not reach here
for await (const chunk of source) {
yield chunk;
}
}, { signal: ac.signal });

ac.abort();

assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
}

{
assert.throws(
() => Readable.from(['a']).compose(Readable.from(['b'])),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}

{
assert.throws(
() => Readable.from(['a']).compose(),
{ code: 'ERR_INVALID_ARG_TYPE' }
);
}
27 changes: 12 additions & 15 deletions test/parallel/test-stream-compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -358,27 +358,24 @@ const assert = require('assert');
}

{
try {
compose();
} catch (err) {
assert.strictEqual(err.code, 'ERR_MISSING_ARGS');
}
assert.throws(
() => compose(),
{ code: 'ERR_MISSING_ARGS' }
);
}

{
try {
compose(new Writable(), new PassThrough());
} catch (err) {
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
}
assert.throws(
() => compose(new Writable(), new PassThrough()),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}

{
try {
compose(new PassThrough(), new Readable({ read() {} }), new PassThrough());
} catch (err) {
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
}
assert.throws(
() => compose(new PassThrough(), new Readable({ read() {} }), new PassThrough()),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}

{
Expand Down