Skip to content

Commit 8bdf558

Browse files
committed
stream: iterator helpers synchronous errors
`streams/operators/map` is no longer a generator function, instead it returns a called generator so that validation can be synchronous and not wait for the first iteration Fixes: #41648
1 parent e2e2bc8 commit 8bdf558

File tree

4 files changed

+118
-139
lines changed

4 files changed

+118
-139
lines changed

lib/internal/streams/operators.js

Lines changed: 103 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const {
2626
const kEmpty = Symbol('kEmpty');
2727
const kEof = Symbol('kEof');
2828

29-
async function * map(fn, options) {
29+
function map(fn, options) {
3030
if (typeof fn !== 'function') {
3131
throw new ERR_INVALID_ARG_TYPE(
3232
'fn', ['Function', 'AsyncFunction'], fn);
@@ -43,118 +43,121 @@ async function * map(fn, options) {
4343

4444
validateInteger(concurrency, 'concurrency', 1);
4545

46-
const ac = new AbortController();
4746
const stream = this;
48-
const queue = [];
49-
const signal = ac.signal;
50-
const signalOpt = { signal };
51-
52-
const abort = () => ac.abort();
53-
if (options?.signal?.aborted) {
54-
abort();
55-
}
5647

57-
options?.signal?.addEventListener('abort', abort);
48+
return async function* map() {
49+
const ac = new AbortController();
50+
const queue = [];
51+
const signal = ac.signal;
52+
const signalOpt = { signal };
5853

59-
let next;
60-
let resume;
61-
let done = false;
62-
63-
function onDone() {
64-
done = true;
65-
}
54+
const abort = () => ac.abort();
55+
if (options?.signal?.aborted) {
56+
abort();
57+
}
6658

67-
async function pump() {
68-
try {
69-
for await (let val of stream) {
70-
if (done) {
71-
return;
72-
}
59+
options?.signal?.addEventListener('abort', abort);
7360

74-
if (signal.aborted) {
75-
throw new AbortError();
76-
}
61+
let next;
62+
let resume;
63+
let done = false;
7764

78-
try {
79-
val = fn(val, signalOpt);
80-
} catch (err) {
81-
val = PromiseReject(err);
82-
}
83-
84-
if (val === kEmpty) {
85-
continue;
86-
}
65+
function onDone() {
66+
done = true;
67+
}
8768

88-
if (typeof val?.catch === 'function') {
89-
val.catch(onDone);
69+
async function pump() {
70+
try {
71+
for await (let val of stream) {
72+
if (done) {
73+
return;
74+
}
75+
76+
if (signal.aborted) {
77+
throw new AbortError();
78+
}
79+
80+
try {
81+
val = fn(val, signalOpt);
82+
} catch (err) {
83+
val = PromiseReject(err);
84+
}
85+
86+
if (val === kEmpty) {
87+
continue;
88+
}
89+
90+
if (typeof val?.catch === 'function') {
91+
val.catch(onDone);
92+
}
93+
94+
queue.push(val);
95+
if (next) {
96+
next();
97+
next = null;
98+
}
99+
100+
if (!done && queue.length && queue.length >= concurrency) {
101+
await new Promise((resolve) => {
102+
resume = resolve;
103+
});
104+
}
90105
}
91-
106+
queue.push(kEof);
107+
} catch (err) {
108+
const val = PromiseReject(err);
109+
PromisePrototypeCatch(val, onDone);
92110
queue.push(val);
111+
} finally {
112+
done = true;
93113
if (next) {
94114
next();
95115
next = null;
96116
}
97-
98-
if (!done && queue.length && queue.length >= concurrency) {
99-
await new Promise((resolve) => {
100-
resume = resolve;
101-
});
102-
}
103-
}
104-
queue.push(kEof);
105-
} catch (err) {
106-
const val = PromiseReject(err);
107-
PromisePrototypeCatch(val, onDone);
108-
queue.push(val);
109-
} finally {
110-
done = true;
111-
if (next) {
112-
next();
113-
next = null;
117+
options?.signal?.removeEventListener('abort', abort);
114118
}
115-
options?.signal?.removeEventListener('abort', abort);
116119
}
117-
}
118-
119-
pump();
120-
121-
try {
122-
while (true) {
123-
while (queue.length > 0) {
124-
const val = await queue[0];
125120

126-
if (val === kEof) {
127-
return;
128-
}
129-
130-
if (signal.aborted) {
131-
throw new AbortError();
132-
}
121+
pump();
133122

134-
if (val !== kEmpty) {
135-
yield val;
123+
try {
124+
while (true) {
125+
while (queue.length > 0) {
126+
const val = await queue[0];
127+
128+
if (val === kEof) {
129+
return;
130+
}
131+
132+
if (signal.aborted) {
133+
throw new AbortError();
134+
}
135+
136+
if (val !== kEmpty) {
137+
yield val;
138+
}
139+
140+
queue.shift();
141+
if (resume) {
142+
resume();
143+
resume = null;
144+
}
136145
}
137146

138-
queue.shift();
139-
if (resume) {
140-
resume();
141-
resume = null;
142-
}
147+
await new Promise((resolve) => {
148+
next = resolve;
149+
});
143150
}
151+
} finally {
152+
ac.abort();
144153

145-
await new Promise((resolve) => {
146-
next = resolve;
147-
});
148-
}
149-
} finally {
150-
ac.abort();
151-
152-
done = true;
153-
if (resume) {
154-
resume();
155-
resume = null;
154+
done = true;
155+
if (resume) {
156+
resume();
157+
resume = null;
158+
}
156159
}
157-
}
160+
}();
158161
}
159162

160163
async function some(fn, options) {
@@ -204,7 +207,7 @@ async function forEach(fn, options) {
204207
for await (const unused of this.map(forEachFn, options));
205208
}
206209

207-
async function * filter(fn, options) {
210+
function filter(fn, options) {
208211
if (typeof fn !== 'function') {
209212
throw new ERR_INVALID_ARG_TYPE(
210213
'fn', ['Function', 'AsyncFunction'], fn);
@@ -215,7 +218,7 @@ async function * filter(fn, options) {
215218
}
216219
return kEmpty;
217220
}
218-
yield* this.map(filterFn, options);
221+
return this.map(filterFn, options);
219222
}
220223

221224
async function toArray(options) {
@@ -229,10 +232,13 @@ async function toArray(options) {
229232
return result;
230233
}
231234

232-
async function* flatMap(fn, options) {
233-
for await (const val of this.map(fn, options)) {
234-
yield* val;
235-
}
235+
function flatMap(fn, options) {
236+
const values = this.map(fn, options);
237+
return async function* flatMap() {
238+
for await (const val of values) {
239+
yield* val;
240+
}
241+
}();
236242
}
237243

238244
function toIntegerOrInfinity(number) {

test/parallel/test-stream-filter.js

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,20 +87,11 @@ const { setTimeout } = require('timers/promises');
8787

8888
{
8989
// Error cases
90-
assert.rejects(async () => {
91-
// eslint-disable-next-line no-unused-vars
92-
for await (const unused of Readable.from([1]).filter(1));
93-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
94-
assert.rejects(async () => {
95-
// eslint-disable-next-line no-unused-vars
96-
for await (const _ of Readable.from([1]).filter((x) => x, {
97-
concurrency: 'Foo'
98-
}));
99-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
100-
assert.rejects(async () => {
101-
// eslint-disable-next-line no-unused-vars
102-
for await (const _ of Readable.from([1]).filter((x) => x, 1));
103-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
90+
assert.throws(() => Readable.from([1]).filter(1), /ERR_INVALID_ARG_TYPE/);
91+
assert.throws(() => Readable.from([1]).filter((x) => x, {
92+
concurrency: 'Foo'
93+
}), /ERR_OUT_OF_RANGE/);
94+
assert.throws(() => Readable.from([1]).filter((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
10495
}
10596
{
10697
// Test result is a Readable

test/parallel/test-stream-flatMap.js

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,11 @@ function oneTo5() {
109109

110110
{
111111
// Error cases
112-
assert.rejects(async () => {
113-
// eslint-disable-next-line no-unused-vars
114-
for await (const unused of Readable.from([1]).flatMap(1));
115-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
116-
assert.rejects(async () => {
117-
// eslint-disable-next-line no-unused-vars
118-
for await (const _ of Readable.from([1]).flatMap((x) => x, {
119-
concurrency: 'Foo'
120-
}));
121-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
122-
assert.rejects(async () => {
123-
// eslint-disable-next-line no-unused-vars
124-
for await (const _ of Readable.from([1]).flatMap((x) => x, 1));
125-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
112+
assert.throws(() => Readable.from([1]).flatMap(1), /ERR_INVALID_ARG_TYPE/);
113+
assert.throws(() => Readable.from([1]).flatMap((x) => x, {
114+
concurrency: 'Foo'
115+
}), /ERR_OUT_OF_RANGE/);
116+
assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
126117
}
127118
{
128119
// Test result is a Readable

test/parallel/test-stream-map.js

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -175,20 +175,11 @@ const { setTimeout } = require('timers/promises');
175175

176176
{
177177
// Error cases
178-
assert.rejects(async () => {
179-
// eslint-disable-next-line no-unused-vars
180-
for await (const unused of Readable.from([1]).map(1));
181-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
182-
assert.rejects(async () => {
183-
// eslint-disable-next-line no-unused-vars
184-
for await (const _ of Readable.from([1]).map((x) => x, {
185-
concurrency: 'Foo'
186-
}));
187-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
188-
assert.rejects(async () => {
189-
// eslint-disable-next-line no-unused-vars
190-
for await (const _ of Readable.from([1]).map((x) => x, 1));
191-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
178+
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
179+
assert.throws(() => Readable.from([1]).map((x) => x, {
180+
concurrency: 'Foo'
181+
}), /ERR_OUT_OF_RANGE/);
182+
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
192183
}
193184
{
194185
// Test result is a Readable

0 commit comments

Comments
 (0)