@@ -64,6 +64,8 @@ ObjectSetPrototypeOf(Writable, Stream);
6464
6565function nop ( ) { }
6666
67+ const kOnFinished = Symbol ( 'kOnFinished' ) ;
68+
6769function WritableState ( options , stream , isDuplex ) {
6870 // Duplex streams are both readable and writable, but share
6971 // the same options object.
@@ -185,6 +187,8 @@ function WritableState(options, stream, isDuplex) {
185187 // True if close has been emitted or would have been emitted
186188 // depending on emitClose.
187189 this . closeEmitted = false ;
190+
191+ this [ kOnFinished ] = [ ] ;
188192}
189193
190194function resetBuffer ( state ) {
@@ -411,7 +415,7 @@ function onwriteError(stream, state, er, cb) {
411415 // not enabled. Passing `er` here doesn't make sense since
412416 // it's related to one specific write, not to the buffered
413417 // writes.
414- errorBuffer ( state , new ERR_STREAM_DESTROYED ( 'write' ) ) ;
418+ errorBuffer ( state ) ;
415419 // This can emit error, but error must always follow cb.
416420 errorOrDestroy ( stream , er ) ;
417421}
@@ -487,14 +491,14 @@ function afterWrite(stream, state, count, cb) {
487491 }
488492
489493 if ( state . destroyed ) {
490- errorBuffer ( state , new ERR_STREAM_DESTROYED ( 'write' ) ) ;
494+ errorBuffer ( state ) ;
491495 }
492496
493497 finishMaybe ( stream , state ) ;
494498}
495499
496500// If there's something in the buffer waiting, then invoke callbacks.
497- function errorBuffer ( state , err ) {
501+ function errorBuffer ( state ) {
498502 if ( state . writing ) {
499503 return ;
500504 }
@@ -503,7 +507,11 @@ function errorBuffer(state, err) {
503507 const { chunk, callback } = state . buffered [ n ] ;
504508 const len = state . objectMode ? 1 : chunk . length ;
505509 state . length -= len ;
506- callback ( err ) ;
510+ callback ( new ERR_STREAM_DESTROYED ( 'write' ) ) ;
511+ }
512+
513+ for ( const callback of state [ kOnFinished ] . splice ( 0 ) ) {
514+ callback ( new ERR_STREAM_DESTROYED ( 'end' ) ) ;
507515 }
508516
509517 resetBuffer ( state ) ;
@@ -611,10 +619,11 @@ Writable.prototype.end = function(chunk, encoding, cb) {
611619 }
612620
613621 if ( typeof cb === 'function' ) {
614- if ( err || state . finished )
622+ if ( err || state . finished ) {
615623 process . nextTick ( cb , err ) ;
616- else
617- onFinished ( this , cb ) ;
624+ } else {
625+ state [ kOnFinished ] . push ( cb ) ;
626+ }
618627 }
619628
620629 return this ;
@@ -636,6 +645,9 @@ function callFinal(stream, state) {
636645 stream . _final ( ( err ) => {
637646 state . pendingcb -- ;
638647 if ( err ) {
648+ for ( const callback of state [ kOnFinished ] . splice ( 0 ) ) {
649+ callback ( err ) ;
650+ }
639651 errorOrDestroy ( stream , err , state . sync ) ;
640652 } else if ( needFinish ( state ) ) {
641653 state . prefinished = true ;
@@ -683,6 +695,11 @@ function finish(stream, state) {
683695 return ;
684696
685697 state . finished = true ;
698+
699+ for ( const callback of state [ kOnFinished ] . splice ( 0 ) ) {
700+ callback ( ) ;
701+ }
702+
686703 stream . emit ( 'finish' ) ;
687704
688705 if ( state . autoDestroy ) {
@@ -701,26 +718,6 @@ function finish(stream, state) {
701718 }
702719}
703720
704- // TODO(ronag): Avoid using events to implement internal logic.
705- function onFinished ( stream , cb ) {
706- function onerror ( err ) {
707- stream . removeListener ( 'finish' , onfinish ) ;
708- stream . removeListener ( 'error' , onerror ) ;
709- cb ( err ) ;
710- if ( stream . listenerCount ( 'error' ) === 0 ) {
711- stream . emit ( 'error' , err ) ;
712- }
713- }
714-
715- function onfinish ( ) {
716- stream . removeListener ( 'finish' , onfinish ) ;
717- stream . removeListener ( 'error' , onerror ) ;
718- cb ( ) ;
719- }
720- stream . on ( 'finish' , onfinish ) ;
721- stream . prependListener ( 'error' , onerror ) ;
722- }
723-
724721ObjectDefineProperties ( Writable . prototype , {
725722
726723 destroyed : {
@@ -800,7 +797,7 @@ const destroy = destroyImpl.destroy;
800797Writable . prototype . destroy = function ( err , cb ) {
801798 const state = this . _writableState ;
802799 if ( ! state . destroyed ) {
803- process . nextTick ( errorBuffer , state , new ERR_STREAM_DESTROYED ( 'write' ) ) ;
800+ process . nextTick ( errorBuffer , state ) ;
804801 }
805802 destroy . call ( this , err , cb ) ;
806803 return this ;
0 commit comments