Skip to content

Commit 62f1ed7

Browse files
authored
Support async iterables (#16)
1 parent ef2b8dc commit 62f1ed7

File tree

6 files changed

+126
-12
lines changed

6 files changed

+126
-12
lines changed

index.d.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ declare namespace intoStream {
77
| NodeJS.TypedArray
88
| ArrayBuffer
99
| string
10-
| Iterable<Buffer | string>;
10+
| Iterable<Buffer | string>
11+
| AsyncIterable<Buffer | string>;
1112

1213
type InputObject =
1314
| object
14-
| Iterable<object>;
15+
| Iterable<object>
16+
| AsyncIterable<object>;
1517
}
1618

1719
declare const intoStream: {

index.js

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const intoStream = input => {
99

1010
let promise;
1111
let iterator;
12+
let asyncIterator;
1213

1314
prepare(input);
1415

@@ -27,6 +28,9 @@ const intoStream = input => {
2728
// We don't iterate on strings and buffers since slicing them is ~7x faster
2829
const shouldIterate = !promise && input[Symbol.iterator] && typeof input !== 'string' && !Buffer.isBuffer(input);
2930
iterator = shouldIterate ? input[Symbol.iterator]() : null;
31+
32+
const shouldAsyncIterate = !promise && input[Symbol.asyncIterator];
33+
asyncIterator = shouldAsyncIterate ? input[Symbol.asyncIterator]() : null;
3034
}
3135

3236
return from(function reader(size, callback) {
@@ -49,6 +53,19 @@ const intoStream = input => {
4953
return;
5054
}
5155

56+
if (asyncIterator) {
57+
(async () => {
58+
try {
59+
const object = await asyncIterator.next();
60+
setImmediate(callback, null, object.done ? null : object.value);
61+
} catch (error) {
62+
setImmediate(callback, error);
63+
}
64+
})();
65+
66+
return;
67+
}
68+
5269
if (input.length === 0) {
5370
setImmediate(callback, null, null);
5471
return;
@@ -72,13 +89,15 @@ module.exports.object = input => {
7289

7390
let promise;
7491
let iterator;
92+
let asyncIterator;
7593

7694
prepare(input);
7795

7896
function prepare(value) {
7997
input = value;
8098
promise = pIsPromise(input) ? input : null;
8199
iterator = !promise && input[Symbol.iterator] ? input[Symbol.iterator]() : null;
100+
asyncIterator = !promise && input[Symbol.asyncIterator] ? input[Symbol.asyncIterator]() : null;
82101
}
83102

84103
return from.obj(function reader(size, callback) {
@@ -101,6 +120,19 @@ module.exports.object = input => {
101120
return;
102121
}
103122

123+
if (asyncIterator) {
124+
(async () => {
125+
try {
126+
const object = await asyncIterator.next();
127+
setImmediate(callback, null, object.done ? null : object.value);
128+
} catch (error) {
129+
setImmediate(callback, error);
130+
}
131+
})();
132+
133+
return;
134+
}
135+
104136
this.push(input);
105137

106138
setImmediate(callback, null, null);

index.test-d.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,18 @@ import intoStream = require('.');
22

33
const unicornArray = 'unicorn'.split('');
44

5+
function asyncGeneratorFrom<T>(array: T[]) {
6+
return async function * () {
7+
yield array[0];
8+
};
9+
}
10+
11+
function asyncIterableFrom<T>(array: T[]) {
12+
return {
13+
[Symbol.asyncIterator]: asyncGeneratorFrom(array)
14+
};
15+
}
16+
517
intoStream('unicorn').pipe(process.stdout);
618
intoStream(unicornArray).pipe(process.stdout);
719
intoStream(new Set(unicornArray)).pipe(process.stdout);
@@ -22,8 +34,12 @@ intoStream(Promise.resolve(new Uint8Array(Buffer.from('unicorn').buffer))).pipe(
2234
process.stdout
2335
);
2436

37+
intoStream(asyncGeneratorFrom(unicornArray)()).pipe(process.stdout);
38+
intoStream(asyncIterableFrom(unicornArray)).pipe(process.stdout);
39+
intoStream(Promise.resolve(asyncIterableFrom(unicornArray))).pipe(process.stdout);
40+
2541
const object = {foo: true};
26-
const objectArray = new Set([object, {bar: true}]);
42+
const objectArray = [object, {bar: true}];
2743
const objectIterable = new Set(objectArray);
2844
const arrayOfArrays = [[object]];
2945

@@ -35,3 +51,6 @@ intoStream.object(Promise.resolve(object)).pipe(process.stdout);
3551
intoStream.object(Promise.resolve(objectArray)).pipe(process.stdout);
3652
intoStream.object(Promise.resolve(objectIterable)).pipe(process.stdout);
3753
intoStream.object(Promise.resolve(arrayOfArrays)).pipe(process.stdout);
54+
intoStream.object(asyncGeneratorFrom(objectArray)()).pipe(process.stdout);
55+
intoStream.object(asyncIterableFrom(objectArray)).pipe(process.stdout);
56+
intoStream.object(Promise.resolve(asyncIterableFrom(objectArray))).pipe(process.stdout);

package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "into-stream",
33
"version": "5.1.1",
4-
"description": "Convert a string/promise/array/iterable/buffer/typedarray/arraybuffer/object into a stream",
4+
"description": "Convert a string/promise/array/iterable/asynciterable/buffer/typedarray/arraybuffer/object into a stream",
55
"license": "MIT",
66
"repository": "sindresorhus/into-stream",
77
"author": {
@@ -26,6 +26,8 @@
2626
"object",
2727
"array",
2828
"iterable",
29+
"async",
30+
"asynciterable",
2931
"promise",
3032
"promises",
3133
"from",
@@ -47,6 +49,7 @@
4749
"ava": "^2.4.0",
4850
"get-stream": "^5.0.0",
4951
"p-event": "^4.1.0",
52+
"p-immediate": "^3.1.0",
5053
"tsd": "^0.9.0",
5154
"xo": "^0.25.3"
5255
}

readme.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# into-stream [![Build Status](https://travis-ci.org/sindresorhus/into-stream.svg?branch=master)](https://travis-ci.org/sindresorhus/into-stream)
22

3-
> Convert a string/promise/array/iterable/buffer/typedarray/arraybuffer/object into a stream
3+
> Convert a string/promise/array/iterable/asynciterable/buffer/typedarray/arraybuffer/object into a stream
44
55
Correctly chunks up the input and handles backpressure.
66

@@ -26,14 +26,14 @@ intoStream('unicorn').pipe(process.stdout);
2626

2727
### intoStream(input)
2828

29-
Type: `Buffer | TypedArray | ArrayBuffer | string | Iterable<Buffer | string> | Promise`<br>
29+
Type: `Buffer | TypedArray | ArrayBuffer | string | Iterable<Buffer | string> | AsyncIterable<Buffer | string> | Promise`<br>
3030
Returns: [Readable stream](https://nodejs.org/api/stream.html#stream_class_stream_readable)
3131

3232
Adheres to the requested chunk size, except for `array` where each element will be a chunk.
3333

3434
### intoStream.object(input)
3535

36-
Type: `object | Iterable<object> | Promise`<br>
36+
Type: `object | Iterable<object> | AsyncIterable<object> | Promise`<br>
3737
Returns: [Readable object stream](https://nodejs.org/api/stream.html#stream_object_mode)
3838

3939

test.js

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,43 @@
11
import test from 'ava';
22
import getStream from 'get-stream';
33
import pEvent from 'p-event';
4+
import pImmediate from 'p-immediate';
45
import intoStream from '.';
56

67
const fixture = 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium.';
78

9+
function generatorFrom(array) {
10+
return function * () {
11+
let i = 0;
12+
while (i < array.length) {
13+
yield array[i++];
14+
}
15+
};
16+
}
17+
818
function iterableFrom(array) {
919
return {
10-
* [Symbol.iterator]() {
11-
let i = 0;
12-
while (i < array.length) {
13-
yield array[i++];
14-
}
20+
[Symbol.iterator]: generatorFrom(array)
21+
};
22+
}
23+
24+
function asyncGeneratorFrom(array) {
25+
return async function * () {
26+
let i = 0;
27+
while (i < array.length) {
28+
// eslint-disable-next-line no-await-in-loop
29+
await pImmediate();
30+
yield array[i++];
1531
}
1632
};
1733
}
1834

35+
function asyncIterableFrom(array) {
36+
return {
37+
[Symbol.asyncIterator]: asyncGeneratorFrom(array)
38+
};
39+
}
40+
1941
test('string', async t => {
2042
t.is(await getStream(intoStream(fixture)), fixture);
2143
});
@@ -54,6 +76,21 @@ test('iterable', async t => {
5476
t.is(await getStream(intoStream(iterable)), fixture);
5577
});
5678

79+
test('generator', async t => {
80+
const generator = generatorFrom(fixture.split(''));
81+
t.is(await getStream(intoStream(generator())), fixture);
82+
});
83+
84+
test('async iterable', async t => {
85+
const iterable = asyncIterableFrom(fixture.split(''));
86+
t.is(await getStream(intoStream(iterable)), fixture);
87+
});
88+
89+
test('async generator', async t => {
90+
const generator = asyncGeneratorFrom(fixture.split(''));
91+
t.is(await getStream(intoStream(generator())), fixture);
92+
});
93+
5794
test('promise', async t => {
5895
const promise = new Promise(resolve => {
5996
setImmediate(resolve.bind(null, fixture));
@@ -68,6 +105,13 @@ test('promise resolving to iterable', async t => {
68105
t.is(await getStream(intoStream(promise)), fixture);
69106
});
70107

108+
test('promise resolving to async iterable', async t => {
109+
const promise = new Promise(resolve => {
110+
setImmediate(resolve.bind(null, asyncIterableFrom(fixture.split(''))));
111+
});
112+
t.is(await getStream(intoStream(promise)), fixture);
113+
});
114+
71115
test('stream errors when promise rejects', async t => {
72116
const promise = new Promise((resolve, reject) => {
73117
setImmediate(reject.bind(null, new Error('test error')));
@@ -89,6 +133,12 @@ test('object mode from iterable', async t => {
89133
t.deepEqual(await getStream.array(intoStream.object(iterable)), values);
90134
});
91135

136+
test('object mode from async iterable', async t => {
137+
const values = [{foo: true}, {bar: true}];
138+
const iterable = asyncIterableFrom(values);
139+
t.deepEqual(await getStream.array(intoStream.object(iterable)), values);
140+
});
141+
92142
test('object mode from promise', async t => {
93143
const f = {foo: true};
94144
const promise = new Promise(resolve => {
@@ -111,6 +161,14 @@ test('object mode from promise resolving to iterable', async t => {
111161
t.deepEqual(await getStream.array(intoStream.object(promise)), values);
112162
});
113163

164+
test('object mode from promise resolving to async iterable', async t => {
165+
const values = [{foo: true}, {bar: true}];
166+
const promise = new Promise(resolve => {
167+
setImmediate(resolve.bind(null, asyncIterableFrom([{foo: true}, {bar: true}])));
168+
});
169+
t.deepEqual(await getStream.array(intoStream.object(promise)), values);
170+
});
171+
114172
test('object mode errors when promise rejects', async t => {
115173
const promise = new Promise((resolve, reject) => {
116174
setImmediate(reject.bind(null, new Error('test error')));

0 commit comments

Comments
 (0)