@@ -716,35 +716,39 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
716716 ondrain ( ) ;
717717 }
718718
719+ function pause ( ) {
720+ // If the user unpiped during `dest.write()`, it is possible
721+ // to get stuck in a permanently paused state if that write
722+ // also returned false.
723+ // => Check whether `dest` is still a piping destination.
724+ if ( ! cleanedUp ) {
725+ if ( state . pipes . length === 1 && state . pipes [ 0 ] === dest ) {
726+ debug ( 'false write response, pause' , 0 ) ;
727+ state . awaitDrainWriters = dest ;
728+ state . multiAwaitDrain = false ;
729+ } else if ( state . pipes . length > 1 && state . pipes . includes ( dest ) ) {
730+ debug ( 'false write response, pause' , state . awaitDrainWriters . size ) ;
731+ state . awaitDrainWriters . add ( dest ) ;
732+ }
733+ src . pause ( ) ;
734+ }
735+ if ( ! ondrain ) {
736+ // When the dest drains, it reduces the awaitDrain counter
737+ // on the source. This would be more elegant with a .once()
738+ // handler in flow(), but adding and removing repeatedly is
739+ // too slow.
740+ ondrain = pipeOnDrain ( src , dest ) ;
741+ dest . on ( 'drain' , ondrain ) ;
742+ }
743+ }
744+
719745 src . on ( 'data' , ondata ) ;
720746 function ondata ( chunk ) {
721747 debug ( 'ondata' ) ;
722748 const ret = dest . write ( chunk ) ;
723749 debug ( 'dest.write' , ret ) ;
724750 if ( ret === false ) {
725- // If the user unpiped during `dest.write()`, it is possible
726- // to get stuck in a permanently paused state if that write
727- // also returned false.
728- // => Check whether `dest` is still a piping destination.
729- if ( ! cleanedUp ) {
730- if ( state . pipes . length === 1 && state . pipes [ 0 ] === dest ) {
731- debug ( 'false write response, pause' , 0 ) ;
732- state . awaitDrainWriters = dest ;
733- state . multiAwaitDrain = false ;
734- } else if ( state . pipes . length > 1 && state . pipes . includes ( dest ) ) {
735- debug ( 'false write response, pause' , state . awaitDrainWriters . size ) ;
736- state . awaitDrainWriters . add ( dest ) ;
737- }
738- src . pause ( ) ;
739- }
740- if ( ! ondrain ) {
741- // When the dest drains, it reduces the awaitDrain counter
742- // on the source. This would be more elegant with a .once()
743- // handler in flow(), but adding and removing repeatedly is
744- // too slow.
745- ondrain = pipeOnDrain ( src , dest ) ;
746- dest . on ( 'drain' , ondrain ) ;
747- }
751+ pause ( ) ;
748752 }
749753 }
750754
@@ -793,7 +797,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
793797
794798 if ( dest . writableNeedDrain === true ) {
795799 if ( state . flowing ) {
796- src . pause ( ) ;
800+ pause ( ) ;
797801 }
798802 } else if ( ! state . flowing ) {
799803 debug ( 'pipe resume' ) ;
0 commit comments