2626'use strict' ;
2727
2828const {
29- Array,
3029 FunctionPrototype,
3130 ObjectDefineProperty,
3231 ObjectDefineProperties,
@@ -150,8 +149,7 @@ function WritableState(options, stream, isDuplex) {
150149 // synchronous _write() completion.
151150 this . afterWriteTickInfo = null ;
152151
153- this . bufferedRequest = null ;
154- this . lastBufferedRequest = null ;
152+ resetBuffer ( this ) ;
155153
156154 // Number of pending user-supplied write callbacks
157155 // this must be 0 before 'finish' can be emitted
@@ -177,27 +175,25 @@ function WritableState(options, stream, isDuplex) {
177175
178176 // Indicates whether the stream has finished destroying.
179177 this . closed = false ;
178+ }
180179
181- // Count buffered requests
182- this . bufferedRequestCount = 0 ;
183-
184- // Allocate the first CorkedRequest, there is always
185- // one allocated and free to use, and we maintain at most two
186- const corkReq = { next : null , entry : null , finish : undefined } ;
187- corkReq . finish = onCorkedFinish . bind ( undefined , corkReq , this ) ;
188- this . corkedRequestsFree = corkReq ;
180+ function resetBuffer ( state ) {
181+ state . buffered = [ ] ;
182+ state . bufferedIndex = 0 ;
183+ state . allBuffers = true ;
184+ state . allNoop = true ;
189185}
190186
191187WritableState . prototype . getBuffer = function getBuffer ( ) {
192- let current = this . bufferedRequest ;
193- const out = [ ] ;
194- while ( current ) {
195- out . push ( current ) ;
196- current = current . next ;
197- }
198- return out ;
188+ return this . buffered . slice ( this . bufferedIndex ) ;
199189} ;
200190
191+ ObjectDefineProperty ( WritableState . prototype , 'bufferedRequestCount' , {
192+ get ( ) {
193+ return this . buffered . length - this . bufferedIndex ;
194+ }
195+ } ) ;
196+
201197// Test _writableState for inheritance to account for Duplex streams,
202198// whose prototype chain only points to Readable.
203199let realHasInstance ;
@@ -318,10 +314,7 @@ Writable.prototype.uncork = function() {
318314 if ( state . corked ) {
319315 state . corked -- ;
320316
321- if ( ! state . writing &&
322- ! state . corked &&
323- ! state . bufferProcessing &&
324- state . bufferedRequest )
317+ if ( ! state . writing )
325318 clearBuffer ( this , state ) ;
326319 }
327320} ;
@@ -339,7 +332,7 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
339332// If we're already writing something, then just put this
340333// in the queue, and wait our turn. Otherwise, call _write
341334// If we return false, then we need a drain event, so set that flag.
342- function writeOrBuffer ( stream , state , chunk , encoding , cb ) {
335+ function writeOrBuffer ( stream , state , chunk , encoding , callback ) {
343336 const len = state . objectMode ? 1 : chunk . length ;
344337
345338 state . length += len ;
@@ -350,22 +343,16 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
350343 state . needDrain = true ;
351344
352345 if ( state . writing || state . corked || state . errored ) {
353- const last = state . lastBufferedRequest ;
354- state . lastBufferedRequest = {
355- chunk,
356- encoding,
357- callback : cb ,
358- next : null
359- } ;
360- if ( last ) {
361- last . next = state . lastBufferedRequest ;
362- } else {
363- state . bufferedRequest = state . lastBufferedRequest ;
346+ state . buffered . push ( { chunk, encoding, callback } ) ;
347+ if ( state . allBuffers && encoding !== 'buffer' ) {
348+ state . allBuffers = false ;
349+ }
350+ if ( state . allNoop && callback !== nop ) {
351+ state . allNoop = false ;
364352 }
365- state . bufferedRequestCount += 1 ;
366353 } else {
367354 state . writelen = len ;
368- state . writecb = cb ;
355+ state . writecb = callback ;
369356 state . writing = true ;
370357 state . sync = true ;
371358 stream . _write ( chunk , encoding , state . onwrite ) ;
@@ -427,30 +414,27 @@ function onwrite(stream, er) {
427414 onwriteError ( stream , state , er , cb ) ;
428415 }
429416 } else {
430- // Check if we're actually ready to finish, but don't emit yet
431- const finished = needFinish ( state ) || stream . destroyed ;
432-
433- if ( ! finished &&
434- ! state . corked &&
435- ! state . bufferProcessing &&
436- state . bufferedRequest ) {
417+ if ( ! state . destroyed ) {
437418 clearBuffer ( stream , state ) ;
438419 }
439-
440- if ( sync ) {
441- // It is a common case that the callback passed to .write() is always
442- // the same. In that case, we do not schedule a new nextTick(), but rather
443- // just increase a counter, to improve performance and avoid memory
444- // allocations.
445- if ( state . afterWriteTickInfo !== null &&
446- state . afterWriteTickInfo . cb === cb ) {
447- state . afterWriteTickInfo . count ++ ;
420+ if ( state . needDrain || cb !== nop || state . ending || state . destroyed ) {
421+ if ( sync ) {
422+ // It is a common case that the callback passed to .write() is always
423+ // the same. In that case, we do not schedule a new nextTick(), but
424+ // rather just increase a counter, to improve performance and avoid
425+ // memory allocations.
426+ if ( state . afterWriteTickInfo !== null &&
427+ state . afterWriteTickInfo . cb === cb ) {
428+ state . afterWriteTickInfo . count ++ ;
429+ } else {
430+ state . afterWriteTickInfo = { count : 1 , cb, stream, state } ;
431+ process . nextTick ( afterWriteTick , state . afterWriteTickInfo ) ;
432+ }
448433 } else {
449- state . afterWriteTickInfo = { count : 1 , cb, stream, state } ;
450- process . nextTick ( afterWriteTick , state . afterWriteTickInfo ) ;
434+ afterWrite ( stream , state , 1 , cb ) ;
451435 }
452436 } else {
453- afterWrite ( stream , state , 1 , cb ) ;
437+ state . pendingcb -- ;
454438 }
455439 }
456440}
@@ -482,83 +466,69 @@ function afterWrite(stream, state, count, cb) {
482466
483467// If there's something in the buffer waiting, then invoke callbacks.
484468function errorBuffer ( state , err ) {
485- if ( state . writing || ! state . bufferedRequest ) {
469+ if ( state . writing ) {
486470 return ;
487471 }
488472
489- for ( let entry = state . bufferedRequest ; entry ; entry = entry . next ) {
490- const len = state . objectMode ? 1 : entry . chunk . length ;
473+ for ( let n = state . bufferedIndex ; n < state . buffered . length ; ++ n ) {
474+ const { chunk, callback } = state . buffered [ n ] ;
475+ const len = state . objectMode ? 1 : chunk . length ;
491476 state . length -= len ;
492- entry . callback ( err ) ;
477+ callback ( err ) ;
493478 }
494- state . bufferedRequest = null ;
495- state . lastBufferedRequest = null ;
496- state . bufferedRequestCount = 0 ;
479+
480+ resetBuffer ( state ) ;
497481}
498482
499483// If there's something in the buffer waiting, then process it
500484function clearBuffer ( stream , state ) {
485+ if ( state . corked || state . bufferProcessing ) {
486+ return ;
487+ }
488+
489+ const { buffered, bufferedIndex, objectMode } = state ;
490+ const bufferedLength = buffered . length - bufferedIndex ;
491+
492+ if ( ! bufferedLength ) {
493+ return ;
494+ }
495+
496+ let i = bufferedIndex ;
497+
501498 state . bufferProcessing = true ;
502- let entry = state . bufferedRequest ;
503-
504- if ( stream . _writev && entry && entry . next ) {
505- // Fast case, write everything using _writev()
506- const l = state . bufferedRequestCount ;
507- const buffer = new Array ( l ) ;
508- const holder = state . corkedRequestsFree ;
509- holder . entry = entry ;
510-
511- let count = 0 ;
512- let allBuffers = true ;
513- while ( entry ) {
514- buffer [ count ] = entry ;
515- if ( entry . encoding !== 'buffer' )
516- allBuffers = false ;
517- entry = entry . next ;
518- count += 1 ;
519- }
520- buffer . allBuffers = allBuffers ;
499+ if ( bufferedLength > 1 && stream . _writev ) {
500+ state . pendingcb -= bufferedLength - 1 ;
501+
502+ const callback = state . allNoop ? nop : ( err ) => {
503+ for ( let n = i ; n < buffered . length ; ++ n ) {
504+ buffered [ n ] . callback ( err ) ;
505+ }
506+ } ;
507+ // Make a copy of `buffered` if it's going to be used by `callback` above,
508+ // since `doWrite` will mutate the array.
509+ const chunks = state . allNoop && i === 0 ? buffered : buffered . slice ( i ) ;
510+ chunks . allBuffers = state . allBuffers ;
521511
522- doWrite ( stream , state , true , state . length , buffer , '' , holder . finish ) ;
512+ doWrite ( stream , state , true , state . length , chunks , '' , callback ) ;
523513
524- // doWrite is almost always async, defer these to save a bit of time
525- // as the hot path ends with doWrite
526- state . pendingcb ++ ;
527- state . lastBufferedRequest = null ;
528- if ( holder . next ) {
529- state . corkedRequestsFree = holder . next ;
530- holder . next = null ;
531- } else {
532- const corkReq = { next : null , entry : null , finish : undefined } ;
533- corkReq . finish = onCorkedFinish . bind ( undefined , corkReq , state ) ;
534- state . corkedRequestsFree = corkReq ;
535- }
536- state . bufferedRequestCount = 0 ;
514+ resetBuffer ( state ) ;
537515 } else {
538- // Slow case, write chunks one-by-one
539- while ( entry ) {
540- const chunk = entry . chunk ;
541- const encoding = entry . encoding ;
542- const cb = entry . callback ;
543- const len = state . objectMode ? 1 : chunk . length ;
544-
545- doWrite ( stream , state , false , len , chunk , encoding , cb ) ;
546- entry = entry . next ;
547- state . bufferedRequestCount -- ;
548- // If we didn't call the onwrite immediately, then
549- // it means that we need to wait until it does.
550- // also, that means that the chunk and cb are currently
551- // being processed, so move the buffer counter past them.
552- if ( state . writing ) {
553- break ;
554- }
516+ do {
517+ const { chunk, encoding, callback } = buffered [ i ] ;
518+ buffered [ i ++ ] = null ;
519+ const len = objectMode ? 1 : chunk . length ;
520+ doWrite ( stream , state , false , len , chunk , encoding , callback ) ;
521+ } while ( i < buffered . length && ! state . writing ) ;
522+
523+ if ( i === buffered . length ) {
524+ resetBuffer ( state ) ;
525+ } else if ( i > 256 ) {
526+ buffered . splice ( 0 , i ) ;
527+ state . bufferedIndex = 0 ;
528+ } else {
529+ state . bufferedIndex = i ;
555530 }
556-
557- if ( entry === null )
558- state . lastBufferedRequest = null ;
559531 }
560-
561- state . bufferedRequest = entry ;
562532 state . bufferProcessing = false ;
563533}
564534
@@ -622,7 +592,7 @@ function needFinish(state) {
622592 return ( state . ending &&
623593 state . length === 0 &&
624594 ! state . errored &&
625- state . bufferedRequest === null &&
595+ state . buffered . length === 0 &&
626596 ! state . finished &&
627597 ! state . writing ) ;
628598}
@@ -693,20 +663,6 @@ function finish(stream, state) {
693663 }
694664}
695665
696- function onCorkedFinish ( corkReq , state , err ) {
697- let entry = corkReq . entry ;
698- corkReq . entry = null ;
699- while ( entry ) {
700- const cb = entry . callback ;
701- state . pendingcb -- ;
702- cb ( err ) ;
703- entry = entry . next ;
704- }
705-
706- // Reuse the free corkReq.
707- state . corkedRequestsFree . next = corkReq ;
708- }
709-
710666// TODO(ronag): Avoid using events to implement internal logic.
711667function onFinished ( stream , state , cb ) {
712668 function onerror ( err ) {
0 commit comments