55 FunctionPrototypeCall,
66 ObjectDefineProperties,
77 PromisePrototypeThen,
8- PromiseResolve,
98 ReflectConstruct,
109 SymbolToStringTag,
1110 Symbol,
@@ -47,6 +46,7 @@ const {
4746 nonOpFlush,
4847 kType,
4948 kState,
49+ nonOpCancel,
5050} = require ( 'internal/webstreams/util' ) ;
5151
5252const {
@@ -377,8 +377,7 @@ function initializeTransformStream(
377377 return transformStreamDefaultSourcePullAlgorithm ( stream ) ;
378378 } ,
379379 cancel ( reason ) {
380- transformStreamErrorWritableAndUnblockWrite ( stream , reason ) ;
381- return PromiseResolve ( ) ;
380+ return transformStreamDefaultSourceCancelAlgorithm ( stream , reason ) ;
382381 } ,
383382 } , {
384383 highWaterMark : readableHighWaterMark ,
@@ -420,6 +419,10 @@ function transformStreamErrorWritableAndUnblockWrite(stream, error) {
420419 writableStreamDefaultControllerErrorIfNeeded (
421420 writable [ kState ] . controller ,
422421 error ) ;
422+ transformStreamUnblockWrite ( stream ) ;
423+ }
424+
425+ function transformStreamUnblockWrite ( stream ) {
423426 if ( stream [ kState ] . backpressure )
424427 transformStreamSetBackpressure ( stream , false ) ;
425428}
@@ -436,13 +439,15 @@ function setupTransformStreamDefaultController(
436439 stream ,
437440 controller ,
438441 transformAlgorithm ,
439- flushAlgorithm ) {
442+ flushAlgorithm ,
443+ cancelAlgorithm ) {
440444 assert ( isTransformStream ( stream ) ) ;
441445 assert ( stream [ kState ] . controller === undefined ) ;
442446 controller [ kState ] = {
443447 stream,
444448 transformAlgorithm,
445449 flushAlgorithm,
450+ cancelAlgorithm,
446451 } ;
447452 stream [ kState ] . controller = controller ;
448453}
@@ -453,21 +458,26 @@ function setupTransformStreamDefaultControllerFromTransformer(
453458 const controller = new TransformStreamDefaultController ( kSkipThrow ) ;
454459 const transform = transformer ?. transform || defaultTransformAlgorithm ;
455460 const flush = transformer ?. flush || nonOpFlush ;
461+ const cancel = transformer ?. cancel || nonOpCancel ;
456462 const transformAlgorithm =
457463 FunctionPrototypeBind ( transform , transformer ) ;
458464 const flushAlgorithm =
459465 FunctionPrototypeBind ( flush , transformer ) ;
466+ const cancelAlgorithm =
467+ FunctionPrototypeBind ( cancel , transformer ) ;
460468
461469 setupTransformStreamDefaultController (
462470 stream ,
463471 controller ,
464472 transformAlgorithm ,
465- flushAlgorithm ) ;
473+ flushAlgorithm ,
474+ cancelAlgorithm ) ;
466475}
467476
468477function transformStreamDefaultControllerClearAlgorithms ( controller ) {
469478 controller [ kState ] . transformAlgorithm = undefined ;
470479 controller [ kState ] . flushAlgorithm = undefined ;
480+ controller [ kState ] . cancelAlgorithm = undefined ;
471481}
472482
473483function transformStreamDefaultControllerEnqueue ( controller , chunk ) {
@@ -556,7 +566,40 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
556566}
557567
558568async function transformStreamDefaultSinkAbortAlgorithm ( stream , reason ) {
559- transformStreamError ( stream , reason ) ;
569+ const {
570+ controller,
571+ readable,
572+ } = stream [ kState ] ;
573+
574+ if ( controller [ kState ] . finishPromise !== undefined ) {
575+ return controller [ kState ] . finishPromise ;
576+ }
577+
578+ const { promise, resolve, reject } = createDeferredPromise ( ) ;
579+ controller [ kState ] . finishPromise = promise ;
580+ const cancelPromise = ensureIsPromise (
581+ controller [ kState ] . cancelAlgorithm ,
582+ controller ,
583+ reason ) ;
584+ transformStreamDefaultControllerClearAlgorithms ( controller ) ;
585+
586+ PromisePrototypeThen (
587+ cancelPromise ,
588+ ( ) => {
589+ if ( readable [ kState ] . state === 'errored' )
590+ reject ( readable [ kState ] . storedError ) ;
591+ else {
592+ readableStreamDefaultControllerError ( readable [ kState ] . controller , reason ) ;
593+ resolve ( ) ;
594+ }
595+ } ,
596+ ( error ) => {
597+ readableStreamDefaultControllerError ( readable [ kState ] . controller , error ) ;
598+ reject ( error ) ;
599+ } ,
600+ ) ;
601+
602+ return controller [ kState ] . finishPromise ;
560603}
561604
562605function transformStreamDefaultSinkCloseAlgorithm ( stream ) {
@@ -565,23 +608,32 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) {
565608 controller,
566609 } = stream [ kState ] ;
567610
611+ if ( controller [ kState ] . finishPromise !== undefined ) {
612+ return controller [ kState ] . finishPromise ;
613+ }
614+ const { promise, resolve, reject } = createDeferredPromise ( ) ;
615+ controller [ kState ] . finishPromise = promise ;
568616 const flushPromise =
569617 ensureIsPromise (
570618 controller [ kState ] . flushAlgorithm ,
571619 controller ,
572620 controller ) ;
573621 transformStreamDefaultControllerClearAlgorithms ( controller ) ;
574- return PromisePrototypeThen (
622+ PromisePrototypeThen (
575623 flushPromise ,
576624 ( ) => {
577625 if ( readable [ kState ] . state === 'errored' )
578- throw readable [ kState ] . storedError ;
579- readableStreamDefaultControllerClose ( readable [ kState ] . controller ) ;
626+ reject ( readable [ kState ] . storedError ) ;
627+ else {
628+ readableStreamDefaultControllerClose ( readable [ kState ] . controller ) ;
629+ resolve ( ) ;
630+ }
580631 } ,
581632 ( error ) => {
582- transformStreamError ( stream , error ) ;
583- throw readable [ kState ] . storedError ;
633+ readableStreamDefaultControllerError ( readable [ kState ] . controller , error ) ;
634+ reject ( error ) ;
584635 } ) ;
636+ return controller [ kState ] . finishPromise ;
585637}
586638
587639function transformStreamDefaultSourcePullAlgorithm ( stream ) {
@@ -591,6 +643,48 @@ function transformStreamDefaultSourcePullAlgorithm(stream) {
591643 return stream [ kState ] . backpressureChange . promise ;
592644}
593645
646+ function transformStreamDefaultSourceCancelAlgorithm ( stream , reason ) {
647+ const {
648+ controller,
649+ writable,
650+ } = stream [ kState ] ;
651+
652+ if ( controller [ kState ] . finishPromise !== undefined ) {
653+ return controller [ kState ] . finishPromise ;
654+ }
655+
656+ const { promise, resolve, reject } = createDeferredPromise ( ) ;
657+ controller [ kState ] . finishPromise = promise ;
658+ const cancelPromise = ensureIsPromise (
659+ controller [ kState ] . cancelAlgorithm ,
660+ controller ,
661+ reason ) ;
662+ transformStreamDefaultControllerClearAlgorithms ( controller ) ;
663+
664+ PromisePrototypeThen ( cancelPromise ,
665+ ( ) => {
666+ if ( writable [ kState ] . state === 'errored' )
667+ reject ( writable [ kState ] . storedError ) ;
668+ else {
669+ writableStreamDefaultControllerErrorIfNeeded (
670+ writable [ kState ] . controller ,
671+ reason ) ;
672+ transformStreamUnblockWrite ( stream ) ;
673+ resolve ( ) ;
674+ }
675+ } ,
676+ ( error ) => {
677+ writableStreamDefaultControllerErrorIfNeeded (
678+ writable [ kState ] . controller ,
679+ error ) ;
680+ transformStreamUnblockWrite ( stream ) ;
681+ reject ( error ) ;
682+ } ,
683+ ) ;
684+
685+ return controller [ kState ] . finishPromise ;
686+ }
687+
594688module . exports = {
595689 TransformStream,
596690 TransformStreamDefaultController,
0 commit comments