@@ -115,6 +115,7 @@ const kHasFlowing = 1 << 23;
115115const kFlowing = 1 << 24 ;
116116const kHasPaused = 1 << 25 ;
117117const kPaused = 1 << 26 ;
118+ const kDataListening = 1 << 27 ;
118119
119120// TODO(benjamingr) it is likely slower to do it this way than with free functions
120121function makeBitMapDescriptor ( bit ) {
@@ -531,8 +532,7 @@ function canPushMore(state) {
531532}
532533
533534function addChunk ( stream , state , chunk , addToFront ) {
534- if ( ( state [ kState ] & ( kFlowing | kSync ) ) === kFlowing && state . length === 0 &&
535- stream . listenerCount ( 'data' ) > 0 ) {
535+ if ( ( state [ kState ] & ( kFlowing | kSync | kDataListening ) ) === ( kFlowing | kDataListening ) && state . length === 0 ) {
536536 // Use the guard to avoid creating `Set()` repeatedly
537537 // when we have multiple pipes.
538538 if ( ( state [ kState ] & kMultiAwaitDrain ) !== 0 ) {
@@ -1058,7 +1058,7 @@ function pipeOnDrain(src, dest) {
10581058 }
10591059
10601060 if ( ( ! state . awaitDrainWriters || state . awaitDrainWriters . size === 0 ) &&
1061- src . listenerCount ( 'data' ) ) {
1061+ ( state [ kState ] & kDataListening ) !== 0 ) {
10621062 src . resume ( ) ;
10631063 }
10641064 } ;
@@ -1105,6 +1105,8 @@ Readable.prototype.on = function(ev, fn) {
11051105 const state = this . _readableState ;
11061106
11071107 if ( ev === 'data' ) {
1108+ state [ kState ] |= kDataListening ;
1109+
11081110 // Update readableListening so that resume() may be a no-op
11091111 // a few lines down. This is needed to support once('readable').
11101112 state . readableListening = this . listenerCount ( 'readable' ) > 0 ;
@@ -1131,6 +1133,8 @@ Readable.prototype.on = function(ev, fn) {
11311133Readable . prototype . addListener = Readable . prototype . on ;
11321134
11331135Readable . prototype . removeListener = function ( ev , fn ) {
1136+ const state = this . _readableState ;
1137+
11341138 const res = Stream . prototype . removeListener . call ( this ,
11351139 ev , fn ) ;
11361140
@@ -1142,6 +1146,8 @@ Readable.prototype.removeListener = function(ev, fn) {
11421146 // resume within the same tick will have no
11431147 // effect.
11441148 process . nextTick ( updateReadableListening , this ) ;
1149+ } else if ( ev === 'data' && this . listenerCount ( 'data' ) === 0 ) {
1150+ state [ kState ] &= ~ kDataListening ;
11451151 }
11461152
11471153 return res ;
@@ -1175,7 +1181,7 @@ function updateReadableListening(self) {
11751181 state [ kState ] |= kHasFlowing | kFlowing ;
11761182
11771183 // Crude way to check if we should resume.
1178- } else if ( self . listenerCount ( 'data' ) > 0 ) {
1184+ } else if ( ( state [ kState ] & kDataListening ) !== 0 ) {
11791185 self . resume ( ) ;
11801186 } else if ( ( state [ kState ] & kReadableListening ) === 0 ) {
11811187 state [ kState ] &= ~ ( kHasFlowing | kFlowing ) ;
0 commit comments