@@ -7,10 +7,12 @@ const {
77 NumberIsSafeInteger,
88 ObjectDefineProperty,
99 ObjectSetPrototypeOf,
10+ Symbol,
1011} = primordials ;
1112
1213const {
13- ERR_OUT_OF_RANGE
14+ ERR_OUT_OF_RANGE ,
15+ ERR_STREAM_DESTROYED
1416} = require ( 'internal/errors' ) . codes ;
1517const internalUtil = require ( 'internal/util' ) ;
1618const { validateNumber } = require ( 'internal/validators' ) ;
@@ -22,6 +24,8 @@ const {
2224} = require ( 'internal/fs/utils' ) ;
2325const { Readable, Writable } = require ( 'stream' ) ;
2426const { toPathIfFileURL } = require ( 'internal/url' ) ;
27+ const kIoDone = Symbol ( 'kIoDone' ) ;
28+ const kIsPerformingIO = Symbol ( 'kIsPerformingIO' ) ;
2529
2630const kMinPoolSpace = 128 ;
2731
@@ -86,6 +90,7 @@ function ReadStream(path, options) {
8690 this . pos = undefined ;
8791 this . bytesRead = 0 ;
8892 this . closed = false ;
93+ this [ kIsPerformingIO ] = false ;
8994
9095 if ( this . start !== undefined ) {
9196 checkPosition ( this . start , 'start' ) ;
@@ -155,6 +160,8 @@ ReadStream.prototype._read = function(n) {
155160 } ) ;
156161 }
157162
163+ if ( this . destroyed ) return ;
164+
158165 if ( ! pool || pool . length - pool . used < kMinPoolSpace ) {
159166 // Discard the old pool.
160167 allocNewPool ( this . readableHighWaterMark ) ;
@@ -178,7 +185,12 @@ ReadStream.prototype._read = function(n) {
178185 return this . push ( null ) ;
179186
180187 // the actual read.
188+ this [ kIsPerformingIO ] = true ;
181189 fs . read ( this . fd , pool , pool . used , toRead , this . pos , ( er , bytesRead ) => {
190+ this [ kIsPerformingIO ] = false ;
191+ // Tell ._destroy() that it's safe to close the fd now.
192+ if ( this . destroyed ) return this . emit ( kIoDone , er ) ;
193+
182194 if ( er ) {
183195 if ( this . autoClose ) {
184196 this . destroy ( ) ;
@@ -224,8 +236,12 @@ ReadStream.prototype._destroy = function(err, cb) {
224236 return ;
225237 }
226238
239+ if ( this [ kIsPerformingIO ] ) {
240+ this . once ( kIoDone , ( er ) => closeFsStream ( this , cb , err || er ) ) ;
241+ return ;
242+ }
243+
227244 closeFsStream ( this , cb , err ) ;
228- this . fd = null ;
229245} ;
230246
231247function closeFsStream ( stream , cb , err ) {
@@ -236,6 +252,8 @@ function closeFsStream(stream, cb, err) {
236252 if ( ! er )
237253 stream . emit ( 'close' ) ;
238254 } ) ;
255+
256+ stream . fd = null ;
239257}
240258
241259ReadStream . prototype . close = function ( cb ) {
@@ -274,6 +292,7 @@ function WriteStream(path, options) {
274292 this . pos = undefined ;
275293 this . bytesWritten = 0 ;
276294 this . closed = false ;
295+ this [ kIsPerformingIO ] = false ;
277296
278297 if ( this . start !== undefined ) {
279298 checkPosition ( this . start , 'start' ) ;
@@ -339,7 +358,17 @@ WriteStream.prototype._write = function(data, encoding, cb) {
339358 } ) ;
340359 }
341360
361+ if ( this . destroyed ) return cb ( new ERR_STREAM_DESTROYED ( 'write' ) ) ;
362+
363+ this [ kIsPerformingIO ] = true ;
342364 fs . write ( this . fd , data , 0 , data . length , this . pos , ( er , bytes ) => {
365+ this [ kIsPerformingIO ] = false ;
366+ // Tell ._destroy() that it's safe to close the fd now.
367+ if ( this . destroyed ) {
368+ cb ( er ) ;
369+ return this . emit ( kIoDone , er ) ;
370+ }
371+
343372 if ( er ) {
344373 if ( this . autoClose ) {
345374 this . destroy ( ) ;
@@ -362,7 +391,8 @@ WriteStream.prototype._writev = function(data, cb) {
362391 } ) ;
363392 }
364393
365- const self = this ;
394+ if ( this . destroyed ) return cb ( new ERR_STREAM_DESTROYED ( 'write' ) ) ;
395+
366396 const len = data . length ;
367397 const chunks = new Array ( len ) ;
368398 let size = 0 ;
@@ -374,12 +404,22 @@ WriteStream.prototype._writev = function(data, cb) {
374404 size += chunk . length ;
375405 }
376406
377- fs . writev ( this . fd , chunks , this . pos , function ( er , bytes ) {
407+ this [ kIsPerformingIO ] = true ;
408+ fs . writev ( this . fd , chunks , this . pos , ( er , bytes ) => {
409+ this [ kIsPerformingIO ] = false ;
410+ // Tell ._destroy() that it's safe to close the fd now.
411+ if ( this . destroyed ) {
412+ cb ( er ) ;
413+ return this . emit ( kIoDone , er ) ;
414+ }
415+
378416 if ( er ) {
379- self . destroy ( ) ;
417+ if ( this . autoClose ) {
418+ this . destroy ( ) ;
419+ }
380420 return cb ( er ) ;
381421 }
382- self . bytesWritten += bytes ;
422+ this . bytesWritten += bytes ;
383423 cb ( ) ;
384424 } ) ;
385425
0 commit comments