@@ -8,6 +8,25 @@ const assert = require('assert');
88const { once } = require ( 'events' ) ;
99const { setTimeout } = require ( 'timers/promises' ) ;
1010
11+ function createDependentPromises ( n ) {
12+ const promiseAndResolveArray = [ ] ;
13+
14+ for ( let i = 0 ; i < n ; i ++ ) {
15+ let res ;
16+ const promise = new Promise ( ( resolve ) => {
17+ if ( i === 0 ) {
18+ res = resolve ;
19+ return ;
20+ }
21+ res = ( ) => promiseAndResolveArray [ i - 1 ] [ 0 ] . then ( resolve ) ;
22+ } ) ;
23+
24+ promiseAndResolveArray . push ( [ promise , res ] ) ;
25+ }
26+
27+ return promiseAndResolveArray ;
28+ }
29+
1130{
1231 // Map works on synchronous streams with a synchronous mapper
1332 const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( ( x ) => x + x ) ;
@@ -143,7 +162,7 @@ const { setTimeout } = require('timers/promises');
143162 const stream = range . map ( common . mustCall ( async ( _ , { signal } ) => {
144163 await once ( signal , 'abort' ) ;
145164 throw signal . reason ;
146- } , 2 ) , { signal : ac . signal , concurrency : 2 } ) ;
165+ } , 2 ) , { signal : ac . signal , concurrency : 2 , highWaterMark : 0 } ) ;
147166 // pump
148167 assert . rejects ( async ( ) => {
149168 for await ( const item of stream ) {
@@ -173,12 +192,164 @@ const { setTimeout } = require('timers/promises');
173192 } ) ( ) . then ( common . mustCall ( ) ) ;
174193}
175194
195+
196+ {
197+ // highWaterMark with small concurrency
198+ const finishOrder = [ ] ;
199+
200+ const promises = createDependentPromises ( 4 ) ;
201+
202+ const raw = Readable . from ( [ 2 , 0 , 1 , 3 ] ) ;
203+ const stream = raw . map ( async ( item ) => {
204+ const [ promise , resolve ] = promises [ item ] ;
205+ resolve ( ) ;
206+
207+ await promise ;
208+ finishOrder . push ( item ) ;
209+ return item ;
210+ } , { concurrency : 2 } ) ;
211+
212+ ( async ( ) => {
213+ await stream . toArray ( ) ;
214+
215+ assert . deepStrictEqual ( finishOrder , [ 0 , 1 , 2 , 3 ] ) ;
216+ } ) ( ) . then ( common . mustCall ( ) , common . mustNotCall ( ) ) ;
217+ }
218+
219+ {
220+ // highWaterMark with a lot of items and large concurrency
221+ const finishOrder = [ ] ;
222+
223+ const promises = createDependentPromises ( 20 ) ;
224+
225+ const input = [ 10 , 1 , 0 , 3 , 4 , 2 , 5 , 7 , 8 , 9 , 6 , 11 , 12 , 13 , 18 , 15 , 16 , 17 , 14 , 19 ] ;
226+ const raw = Readable . from ( input ) ;
227+ // Should be
228+ // 10, 1, 0, 3, 4, 2 | next: 0
229+ // 10, 1, 3, 4, 2, 5 | next: 1
230+ // 10, 3, 4, 2, 5, 7 | next: 2
231+ // 10, 3, 4, 5, 7, 8 | next: 3
232+ // 10, 4, 5, 7, 8, 9 | next: 4
233+ // 10, 5, 7, 8, 9, 6 | next: 5
234+ // 10, 7, 8, 9, 6, 11 | next: 6
235+ // 10, 7, 8, 9, 11, 12 | next: 7
236+ // 10, 8, 9, 11, 12, 13 | next: 8
237+ // 10, 9, 11, 12, 13, 18 | next: 9
238+ // 10, 11, 12, 13, 18, 15 | next: 10
239+ // 11, 12, 13, 18, 15, 16 | next: 11
240+ // 12, 13, 18, 15, 16, 17 | next: 12
241+ // 13, 18, 15, 16, 17, 14 | next: 13
242+ // 18, 15, 16, 17, 14, 19 | next: 14
243+ // 18, 15, 16, 17, 19 | next: 15
244+ // 18, 16, 17, 19 | next: 16
245+ // 18, 17, 19 | next: 17
246+ // 18, 19 | next: 18
247+ // 19 | next: 19
248+ //
249+
250+ const stream = raw . map ( async ( item ) => {
251+ const [ promise , resolve ] = promises [ item ] ;
252+ resolve ( ) ;
253+
254+ await promise ;
255+ finishOrder . push ( item ) ;
256+ return item ;
257+ } , { concurrency : 6 } ) ;
258+
259+ ( async ( ) => {
260+ const outputOrder = await stream . toArray ( ) ;
261+
262+ assert . deepStrictEqual ( outputOrder , input ) ;
263+ assert . deepStrictEqual ( finishOrder , [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 ] ) ;
264+ } ) ( ) . then ( common . mustCall ( ) , common . mustNotCall ( ) ) ;
265+ }
266+
267+ {
268+ // Custom highWaterMark with a lot of items and large concurrency
269+ const finishOrder = [ ] ;
270+
271+ const promises = createDependentPromises ( 20 ) ;
272+
273+ const input = [ 11 , 1 , 0 , 3 , 4 , 2 , 5 , 7 , 8 , 9 , 6 , 10 , 12 , 13 , 18 , 15 , 16 , 17 , 14 , 19 ] ;
274+ const raw = Readable . from ( input ) ;
275+ // Should be
276+ // 11, 1, 0, 3, 4 | next: 0, buffer: []
277+ // 11, 1, 3, 4, 2 | next: 1, buffer: [0]
278+ // 11, 3, 4, 2, 5 | next: 2, buffer: [0, 1]
279+ // 11, 3, 4, 5, 7 | next: 3, buffer: [0, 1, 2]
280+ // 11, 4, 5, 7, 8 | next: 4, buffer: [0, 1, 2, 3]
281+ // 11, 5, 7, 8, 9 | next: 5, buffer: [0, 1, 2, 3, 4]
282+ // 11, 7, 8, 9, 6 | next: 6, buffer: [0, 1, 2, 3, 4, 5]
283+ // 11, 7, 8, 9, 10 | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full
284+ // 11, 8, 9, 10, 12 | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6]
285+ // 11, 9, 10, 12, 13 | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6]
286+ // 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6]
287+ // 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6]
288+ // 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it
289+ // 13, 18, 15, 16, 17 | next: 13, buffer: []
290+ // 18, 15, 16, 17, 14 | next: 14, buffer: []
291+ // 18, 15, 16, 17, 19 | next: 15, buffer: [14]
292+ // 18, 16, 17, 19 | next: 16, buffer: [14, 15]
293+ // 18, 17, 19 | next: 17, buffer: [14, 15, 16]
294+ // 18, 19 | next: 18, buffer: [14, 15, 16, 17]
295+ // 19 | next: 19, buffer: [] -- all items flushed
296+ //
297+
298+ const stream = raw . map ( async ( item ) => {
299+ const [ promise , resolve ] = promises [ item ] ;
300+ resolve ( ) ;
301+
302+ await promise ;
303+ finishOrder . push ( item ) ;
304+ return item ;
305+ } , { concurrency : 5 , highWaterMark : 7 } ) ;
306+
307+ ( async ( ) => {
308+ const outputOrder = await stream . toArray ( ) ;
309+
310+ assert . deepStrictEqual ( outputOrder , input ) ;
311+ assert . deepStrictEqual ( finishOrder , [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 ] ) ;
312+ } ) ( ) . then ( common . mustCall ( ) , common . mustNotCall ( ) ) ;
313+ }
314+
315+ {
316+ // Where there is a delay between the first and the next item it should not wait for filled queue
317+ // before yielding to the user
318+ const promises = createDependentPromises ( 3 ) ;
319+
320+ const raw = Readable . from ( [ 0 , 1 , 2 ] ) ;
321+
322+ const stream = raw
323+ . map ( async ( item ) => {
324+ if ( item !== 0 ) {
325+ await promises [ item ] [ 0 ] ;
326+ }
327+
328+ return item ;
329+ } , { concurrency : 2 } )
330+ . map ( ( item ) => {
331+ // eslint-disable-next-line no-unused-vars
332+ for ( const [ _ , resolve ] of promises ) {
333+ resolve ( ) ;
334+ }
335+
336+ return item ;
337+ } ) ;
338+
339+ ( async ( ) => {
340+ await stream . toArray ( ) ;
341+ } ) ( ) . then ( common . mustCall ( ) , common . mustNotCall ( ) ) ;
342+ }
343+
176344{
177345 // Error cases
178346 assert . throws ( ( ) => Readable . from ( [ 1 ] ) . map ( 1 ) , / E R R _ I N V A L I D _ A R G _ T Y P E / ) ;
179347 assert . throws ( ( ) => Readable . from ( [ 1 ] ) . map ( ( x ) => x , {
180348 concurrency : 'Foo'
181349 } ) , / E R R _ O U T _ O F _ R A N G E / ) ;
350+ assert . throws ( ( ) => Readable . from ( [ 1 ] ) . map ( ( x ) => x , {
351+ concurrency : - 1
352+ } ) , / E R R _ O U T _ O F _ R A N G E / ) ;
182353 assert . throws ( ( ) => Readable . from ( [ 1 ] ) . map ( ( x ) => x , 1 ) , / E R R _ I N V A L I D _ A R G _ T Y P E / ) ;
183354 assert . throws ( ( ) => Readable . from ( [ 1 ] ) . map ( ( x ) => x , { signal : true } ) , / E R R _ I N V A L I D _ A R G _ T Y P E / ) ;
184355}
0 commit comments