@@ -113,6 +113,7 @@ function ReadableState(options, stream, isDuplex) {
113113 this . emittedReadable = false ;
114114 this . readableListening = false ;
115115 this . resumeScheduled = false ;
116+ this . paused = true ;
116117
117118 // Should close be emitted on destroy. Defaults to true.
118119 this . emitClose = options . emitClose !== false ;
@@ -858,10 +859,16 @@ Readable.prototype.removeAllListeners = function(ev) {
858859} ;
859860
860861function updateReadableListening ( self ) {
861- self . _readableState . readableListening = self . listenerCount ( 'readable' ) > 0 ;
862+ const state = self . _readableState ;
863+ state . readableListening = self . listenerCount ( 'readable' ) > 0 ;
862864
863- // crude way to check if we should resume
864- if ( self . listenerCount ( 'data' ) > 0 ) {
865+ if ( state . resumeScheduled && ! state . paused ) {
866+ // flowing needs to be set to true now, otherwise
867+ // the upcoming resume will not flow.
868+ state . flowing = true ;
869+
870+ // crude way to check if we should resume
871+ } else if ( self . listenerCount ( 'data' ) > 0 ) {
865872 self . resume ( ) ;
866873 }
867874}
@@ -883,6 +890,7 @@ Readable.prototype.resume = function() {
883890 state . flowing = ! state . readableListening ;
884891 resume ( this , state ) ;
885892 }
893+ state . paused = false ;
886894 return this ;
887895} ;
888896
@@ -913,6 +921,7 @@ Readable.prototype.pause = function() {
913921 this . _readableState . flowing = false ;
914922 this . emit ( 'pause' ) ;
915923 }
924+ this . _readableState . paused = true ;
916925 return this ;
917926} ;
918927
0 commit comments