@@ -435,7 +435,7 @@ function readableAddChunkUnshiftObjectMode(stream, state, chunk) {
435435function readableAddChunkUnshiftValue ( stream , state , chunk ) {
436436 if ( ( state [ kState ] & kEndEmitted ) !== 0 )
437437 errorOrDestroy ( stream , new ERR_STREAM_UNSHIFT_AFTER_END_EVENT ( ) ) ;
438- else if ( state . destroyed || state . errored )
438+ else if ( ( state [ kState ] & ( kDestroyed | kErrored ) ) !== 0 )
439439 return false ;
440440 else
441441 addChunk ( stream , state , chunk , true ) ;
@@ -604,7 +604,7 @@ function computeNewHighWaterMark(n) {
604604// This function is designed to be inlinable, so please take care when making
605605// changes to the function body.
606606function howMuchToRead ( n , state ) {
607- if ( n <= 0 || ( state . length === 0 && state . ended ) )
607+ if ( n <= 0 || ( state . length === 0 && ( state [ kState ] & kEnded ) !== 0 ) )
608608 return 0 ;
609609 if ( ( state [ kState ] & kObjectMode ) !== 0 )
610610 return 1 ;
@@ -648,7 +648,7 @@ Readable.prototype.read = function(n) {
648648 state . length >= state . highWaterMark :
649649 state . length > 0 ) ||
650650 ( state [ kState ] & kEnded ) !== 0 ) ) {
651- debug ( 'read: emitReadable' , state . length , ( state [ kState ] & kEnded ) !== 0 ) ;
651+ debug ( 'read: emitReadable' ) ;
652652 if ( state . length === 0 && ( state [ kState ] & kEnded ) !== 0 )
653653 endReadable ( this ) ;
654654 else
@@ -806,7 +806,7 @@ function emitReadable(stream) {
806806function emitReadable_ ( stream ) {
807807 const state = stream . _readableState ;
808808 debug ( 'emitReadable_' ) ;
809- if ( ( state [ kState ] & ( kDestroyed | kErrored ) ) === 0 && ( state . length || state . ended ) ) {
809+ if ( ( state [ kState ] & ( kDestroyed | kErrored ) ) === 0 && ( state . length || ( state [ kState ] & kEnded ) !== 0 ) ) {
810810 stream . emit ( 'readable' ) ;
811811 state [ kState ] &= ~ kEmittedReadable ;
812812 }
@@ -887,7 +887,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
887887 const state = this . _readableState ;
888888
889889 if ( state . pipes . length === 1 ) {
890- if ( ! state . multiAwaitDrain ) {
890+ if ( ( state [ kState ] & kMultiAwaitDrain ) === 0 ) {
891891 state [ kState ] |= kMultiAwaitDrain ;
892892 state . awaitDrainWriters = new SafeSet (
893893 state . awaitDrainWriters ? [ state . awaitDrainWriters ] : [ ] ,
@@ -903,7 +903,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
903903 dest !== process . stderr ;
904904
905905 const endFn = doEnd ? onend : unpipe ;
906- if ( state . endEmitted )
906+ if ( ( state [ kState ] & kEndEmitted ) !== 0 )
907907 process . nextTick ( endFn ) ;
908908 else
909909 src . once ( 'end' , endFn ) ;
@@ -962,7 +962,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
962962 if ( state . pipes . length === 1 && state . pipes [ 0 ] === dest ) {
963963 debug ( 'false write response, pause' , 0 ) ;
964964 state . awaitDrainWriters = dest ;
965- state . multiAwaitDrain = false ;
965+ state [ kState ] &= ~ kMultiAwaitDrain ;
966966 } else if ( state . pipes . length > 1 && state . pipes . includes ( dest ) ) {
967967 debug ( 'false write response, pause' , state . awaitDrainWriters . size ) ;
968968 state . awaitDrainWriters . add ( dest ) ;
@@ -1034,7 +1034,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
10341034
10351035 if ( dest . writableNeedDrain === true ) {
10361036 pause ( ) ;
1037- } else if ( ! state . flowing ) {
1037+ } else if ( ( state [ kState ] & kFlowing ) === 0 ) {
10381038 debug ( 'pipe resume' ) ;
10391039 src . resume ( ) ;
10401040 }
@@ -1052,7 +1052,7 @@ function pipeOnDrain(src, dest) {
10521052 if ( state . awaitDrainWriters === dest ) {
10531053 debug ( 'pipeOnDrain' , 1 ) ;
10541054 state . awaitDrainWriters = null ;
1055- } else if ( state . multiAwaitDrain ) {
1055+ } else if ( ( state [ kState ] & kMultiAwaitDrain ) !== 0 ) {
10561056 debug ( 'pipeOnDrain' , state . awaitDrainWriters . size ) ;
10571057 state . awaitDrainWriters . delete ( dest ) ;
10581058 }
@@ -1107,20 +1107,20 @@ Readable.prototype.on = function(ev, fn) {
11071107 if ( ev === 'data' ) {
11081108 // Update readableListening so that resume() may be a no-op
11091109 // a few lines down. This is needed to support once('readable').
1110- state . readableListening = this . listenerCount ( 'readable' ) > 0 ;
1110+ state [ kState ] | = this . listenerCount ( 'readable' ) > 0 ? kReadableListening : 0 ;
11111111
11121112 // Try start flowing on next tick if stream isn't explicitly paused.
1113- if ( state . flowing !== false )
1113+ if ( ( state [ kState ] & ( kHasFlowing | kFlowing ) ) !== kHasFlowing ) {
11141114 this . resume ( ) ;
1115+ }
11151116 } else if ( ev === 'readable' ) {
1116- if ( ! state . endEmitted && ! state . readableListening ) {
1117- state . readableListening = state . needReadable = true ;
1118- state . flowing = false ;
1119- state . emittedReadable = false ;
1120- debug ( 'on readable' , state . length , state . reading ) ;
1117+ if ( ( state [ kState ] & ( kEndEmitted | kReadableListening ) ) === 0 ) {
1118+ state [ kState ] |= kReadableListening | kNeedReadable | kHasFlowing ;
1119+ state [ kState ] &= ~ ( kFlowing | kEmittedReadable ) ;
1120+ debug ( 'on readable' ) ;
11211121 if ( state . length ) {
11221122 emitReadable ( this ) ;
1123- } else if ( ! state . reading ) {
1123+ } else if ( ( state [ kState ] & kReading ) === 0 ) {
11241124 process . nextTick ( nReadingNextTick , this ) ;
11251125 }
11261126 }
@@ -1167,7 +1167,12 @@ Readable.prototype.removeAllListeners = function(ev) {
11671167
11681168function updateReadableListening ( self ) {
11691169 const state = self . _readableState ;
1170- state . readableListening = self . listenerCount ( 'readable' ) > 0 ;
1170+
1171+ if ( self . listenerCount ( 'readable' ) > 0 ) {
1172+ state [ kState ] |= kReadableListening ;
1173+ } else {
1174+ state [ kState ] &= ~ kReadableListening ;
1175+ }
11711176
11721177 if ( ( state [ kState ] & ( kHasPaused | kPaused | kResumeScheduled ) ) === ( kHasPaused | kResumeScheduled ) ) {
11731178 // Flowing needs to be set to true now, otherwise
@@ -1197,7 +1202,7 @@ Readable.prototype.resume = function() {
11971202 // for readable, but we still have to call
11981203 // resume().
11991204 state [ kState ] |= kHasFlowing ;
1200- if ( ! state . readableListening ) {
1205+ if ( ( state [ kState ] & kReadableListening ) === 0 ) {
12011206 state [ kState ] |= kFlowing ;
12021207 } else {
12031208 state [ kState ] &= ~ kFlowing ;
@@ -1210,8 +1215,8 @@ Readable.prototype.resume = function() {
12101215} ;
12111216
12121217function resume ( stream , state ) {
1213- if ( ! state . resumeScheduled ) {
1214- state . resumeScheduled = true ;
1218+ if ( ( state [ kState ] & kResumeScheduled ) === 0 ) {
1219+ state [ kState ] |= kResumeScheduled ;
12151220 process . nextTick ( resume_ , stream , state ) ;
12161221 }
12171222}
@@ -1232,7 +1237,7 @@ function resume_(stream, state) {
12321237Readable . prototype . pause = function ( ) {
12331238 const state = this . _readableState ;
12341239 debug ( 'call pause' ) ;
1235- if ( state . flowing !== false ) {
1240+ if ( ( state [ kState ] & ( kHasFlowing | kFlowing ) ) !== kHasFlowing ) {
12361241 debug ( 'pause' ) ;
12371242 state [ kState ] |= kHasFlowing ;
12381243 state [ kState ] &= ~ kFlowing ;
@@ -1572,20 +1577,19 @@ function fromList(n, state) {
15721577function endReadable ( stream ) {
15731578 const state = stream . _readableState ;
15741579
1575- debug ( 'endReadable' , ( state [ kState ] & kEndEmitted ) !== 0 ) ;
1580+ debug ( 'endReadable' ) ;
15761581 if ( ( state [ kState ] & kEndEmitted ) === 0 ) {
15771582 state [ kState ] |= kEnded ;
15781583 process . nextTick ( endReadableNT , state , stream ) ;
15791584 }
15801585}
15811586
15821587function endReadableNT ( state , stream ) {
1583- debug ( 'endReadableNT' , state . endEmitted , state . length ) ;
1588+ debug ( 'endReadableNT' ) ;
15841589
15851590 // Check that we didn't get one last unshift.
1586- if ( ! state . errored && ! state . closeEmitted &&
1587- ! state . endEmitted && state . length === 0 ) {
1588- state . endEmitted = true ;
1591+ if ( ( state [ kState ] & ( kErrored | kCloseEmitted | kEndEmitted ) ) === 0 && state . length === 0 ) {
1592+ state [ kState ] |= kEndEmitted ;
15891593 stream . emit ( 'end' ) ;
15901594
15911595 if ( stream . writable && stream . allowHalfOpen === false ) {
0 commit comments