@@ -5,6 +5,7 @@ const assert = require('assert');
55const { Duplex, Readable, Writable, pipeline, PassThrough } = require ( 'stream' ) ;
66const { ReadableStream, WritableStream } = require ( 'stream/web' ) ;
77const { Blob } = require ( 'buffer' ) ;
8+ const sleep = require ( 'util' ) . promisify ( setTimeout ) ;
89
910{
1011 const d = Duplex . from ( {
@@ -401,3 +402,193 @@ function makeATestWritableStream(writeFunc) {
401402 assert . strictEqual ( d . writable , false ) ;
402403 } ) ) ;
403404}
405+
406+ {
407+ const r = Readable . from ( [ 'foo' , 'bar' , 'baz' ] ) ;
408+ pipeline (
409+ r ,
410+ Duplex . from ( async function ( asyncGenerator ) {
411+ const values = await Array . fromAsync ( asyncGenerator ) ;
412+ assert . deepStrictEqual ( values , [ 'foo' , 'bar' , 'baz' ] ) ;
413+
414+ await asyncGenerator . return ( ) ;
415+ await asyncGenerator . return ( ) ;
416+ await asyncGenerator . return ( ) ;
417+ } ) ,
418+ common . mustSucceed ( ( ) => {
419+ assert . strictEqual ( r . destroyed , true ) ;
420+ } )
421+ ) ;
422+ }
423+
424+ {
425+ const r = Readable . from ( [ 'foo' , 'bar' , 'baz' ] ) ;
426+ pipeline (
427+ r ,
428+ Duplex . from ( async function ( asyncGenerator ) {
429+ // eslint-disable-next-line no-unused-vars
430+ for await ( const _ of asyncGenerator ) break ;
431+ } ) ,
432+ common . mustSucceed ( ( ) => {
433+ assert . strictEqual ( r . destroyed , true ) ;
434+ } )
435+ ) ;
436+ }
437+
438+ {
439+ const r = Readable . from ( [ 'foo' , 'bar' , 'baz' ] ) ;
440+ pipeline (
441+ r ,
442+ Duplex . from ( async function ( asyncGenerator ) {
443+ const a = await asyncGenerator . next ( ) ;
444+ assert . strictEqual ( a . done , false ) ;
445+ assert . strictEqual ( a . value . toString ( ) , 'foo' ) ;
446+ const b = await asyncGenerator . return ( ) ;
447+ assert . strictEqual ( b . done , true ) ;
448+ } ) ,
449+ common . mustSucceed ( ( ) => {
450+ assert . strictEqual ( r . destroyed , true ) ;
451+ } )
452+ ) ;
453+ }
454+
455+ {
456+ const r = Readable . from ( [ 'foo' , 'bar' , 'baz' ] ) ;
457+ pipeline (
458+ r ,
459+ Duplex . from ( async function ( asyncGenerator ) {
460+ // Note: the generator is not even started at this point
461+ await asyncGenerator . return ( ) ;
462+ } ) ,
463+ common . mustSucceed ( ( ) => {
464+ assert . strictEqual ( r . destroyed , true ) ;
465+ } )
466+ ) ;
467+ }
468+
469+ {
470+ const r = Readable . from ( [ 'foo' , 'bar' , 'baz' ] ) ;
471+ pipeline (
472+ r ,
473+ Duplex . from ( async function ( asyncGenerator ) {
474+ // Same as before, with a delay
475+ await sleep ( 100 ) ;
476+ await asyncGenerator . return ( ) ;
477+ } ) ,
478+ common . mustSucceed ( ( ) => {
479+ assert . strictEqual ( r . destroyed , true ) ;
480+ } )
481+ ) ;
482+ }
483+
484+ {
485+ const r = Readable . from ( [ 'foo' , 'bar' , 'baz' ] ) ;
486+ pipeline (
487+ r ,
488+ Duplex . from ( async function ( asyncGenerator ) { } ) ,
489+ common . mustCall ( ( err ) => {
490+ assert . strictEqual ( err . code , 'ERR_STREAM_PREMATURE_CLOSE' ) ;
491+ assert . strictEqual ( r . destroyed , true ) ;
492+ } )
493+ ) ;
494+ }
495+
496+ {
497+ const r = Readable . from ( [ 'foo' , 'bar' , 'baz' ] ) ;
498+ pipeline (
499+ r ,
500+ Duplex . from ( async function ( asyncGenerator ) {
501+ await sleep ( 100 ) ;
502+ } ) ,
503+ common . mustCall ( ( err ) => {
504+ assert . strictEqual ( err . code , 'ERR_STREAM_PREMATURE_CLOSE' ) ;
505+ assert . strictEqual ( r . destroyed , true ) ;
506+ } )
507+ ) ;
508+ }
509+
510+ {
511+ const r = Readable . from ( [ 'foo' , 'bar' , 'baz' ] ) ;
512+ const d = Duplex . from ( async function ( asyncGenerator ) {
513+ while ( ! ( await asyncGenerator . next ( ) ) . done ) await sleep ( 100 ) ;
514+ } ) ;
515+
516+ setTimeout ( ( ) => d . destroy ( ) , 150 ) ;
517+
518+ pipeline (
519+ r ,
520+ d ,
521+ common . mustCall ( ( err ) => {
522+ assert . strictEqual ( err . code , 'ERR_STREAM_PREMATURE_CLOSE' ) ;
523+ assert . strictEqual ( r . destroyed , true ) ;
524+ } )
525+ ) ;
526+ }
527+
528+ {
529+ const r = Duplex . from ( async function * ( ) {
530+ for ( const value of [ 'foo' , 'bar' , 'baz' ] ) {
531+ await sleep ( 50 ) ;
532+ yield value ;
533+ }
534+ } ) ;
535+ const d = Duplex . from ( async function ( asyncGenerator ) {
536+ while ( ! ( await asyncGenerator . next ( ) ) . done ) ;
537+ } ) ;
538+
539+ setTimeout ( ( ) => r . destroy ( ) , 75 ) ;
540+
541+ pipeline (
542+ r ,
543+ d ,
544+ common . mustCall ( ( err ) => {
545+ assert . strictEqual ( err . code , 'ERR_STREAM_PREMATURE_CLOSE' ) ;
546+ assert . strictEqual ( r . destroyed , true ) ;
547+ assert . strictEqual ( d . destroyed , true ) ;
548+ } )
549+ ) ;
550+ }
551+
552+ {
553+ const r = Readable . from ( [ 'foo' ] ) ;
554+ pipeline (
555+ r ,
556+ Duplex . from ( async function ( asyncGenerator ) {
557+ await asyncGenerator . throw ( new Error ( 'my error' ) ) ;
558+ } ) ,
559+ common . mustCall ( ( err ) => {
560+ assert . strictEqual ( err . message , 'my error' ) ;
561+ assert . strictEqual ( r . destroyed , true ) ;
562+ } )
563+ ) ;
564+ }
565+
566+ {
567+ const r = Readable . from ( [ 'foo' , 'bar' ] ) ;
568+ pipeline (
569+ r ,
570+ Duplex . from ( async function ( asyncGenerator ) {
571+ await asyncGenerator . next ( ) ;
572+ await asyncGenerator . throw ( new Error ( 'my error' ) ) ;
573+ } ) ,
574+ common . mustCall ( ( err ) => {
575+ assert . strictEqual ( err . message , 'my error' ) ;
576+ assert . strictEqual ( r . destroyed , true ) ;
577+ } )
578+ ) ;
579+ }
580+
581+ {
582+ const r = Readable . from ( [ 'foo' , 'bar' ] ) ;
583+ pipeline (
584+ r ,
585+ Duplex . from ( async function ( asyncGenerator ) {
586+ await asyncGenerator . next ( ) ;
587+ await asyncGenerator . throw ( ) ;
588+ } ) ,
589+ common . mustCall ( ( err ) => {
590+ assert . strictEqual ( err . code , 'ABORT_ERR' ) ;
591+ assert . strictEqual ( r . destroyed , true ) ;
592+ } )
593+ ) ;
594+ }
0 commit comments