Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ function Transform(options) {
}

function final(cb) {
let called = false;
if (typeof this._flush === 'function' && !this.destroyed) {
this._flush((er, data) => {
const result = this._flush((er, data) => {
called = true;
if (er) {
if (cb) {
cb(er);
Expand All @@ -126,6 +128,33 @@ function final(cb) {
cb();
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
(data) => {
if (called)
return;
if (data != null)
this.push(data);
this.push(null);
if (cb)
process.nextTick(cb);
},
(err) => {
if (cb) {
process.nextTick(cb, err);
} else {
process.nextTick(() => this.destroy(err));
}
});
}
} catch (err) {
process.nextTick(() => this.destroy(err));
}
}
} else {
this.push(null);
if (cb) {
Expand All @@ -151,7 +180,9 @@ Transform.prototype._write = function(chunk, encoding, callback) {
const wState = this._writableState;
const length = rState.length;

this._transform(chunk, encoding, (err, val) => {
let called = false;
const result = this._transform(chunk, encoding, (err, val) => {
called = true;
if (err) {
callback(err);
return;
Expand All @@ -172,6 +203,38 @@ Transform.prototype._write = function(chunk, encoding, callback) {
this[kCallback] = callback;
}
});
if (result !== undefined && result != null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
(val) => {
if (called)
return;

if (val != null) {
this.push(val);
}

if (
wState.ended ||
length === rState.length ||
rState.length < rState.highWaterMark ||
rState.length === 0) {
process.nextTick(callback);
} else {
this[kCallback] = callback;
}
},
(err) => {
process.nextTick(callback, err);
});
}
} catch (err) {
process.nextTick(callback, err);
}
}
};

Transform.prototype._read = function() {
Expand Down
27 changes: 26 additions & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ function needFinish(state) {
function callFinal(stream, state) {
state.sync = true;
state.pendingcb++;
stream._final((err) => {
const result = stream._final((err) => {
state.pendingcb--;
if (err) {
for (const callback of state[kOnFinished].splice(0)) {
Expand All @@ -664,6 +664,31 @@ function callFinal(stream, state) {
process.nextTick(finish, stream, state);
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
if (state.prefinished)
return;
state.prefinish = true;
process.nextTick(() => stream.emit('prefinish'));
state.pendingcb++;
process.nextTick(finish, stream, state);
},
function(err) {
for (const callback of state[kOnFinished].splice(0)) {
process.nextTick(callback, err);
}
process.nextTick(errorOrDestroy, stream, err, state.sync);
});
}
} catch (err) {
process.nextTick(errorOrDestroy, stream, err, state.sync);
}
}
state.sync = false;
}

Expand Down
106 changes: 104 additions & 2 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@ function destroy(err, cb) {
}

function _destroy(self, err, cb) {
self._destroy(err || null, (err) => {
let called = false;
const result = self._destroy(err || null, (err) => {
const r = self._readableState;
const w = self._writableState;

called = true;

if (err) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack;
Expand Down Expand Up @@ -92,6 +95,64 @@ function _destroy(self, err, cb) {
process.nextTick(emitCloseNT, self);
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
if (called)
return;

const r = self._readableState;
const w = self._writableState;

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (typeof cb === 'function') {
process.nextTick(cb);
}

process.nextTick(emitCloseNT, self);
},
function(err) {
const r = self._readableState;
const w = self._writableState;
err.stack;

called = true;

if (w && !w.errored) {
w.errored = err;
}
if (r && !r.errored) {
r.errored = err;
}

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (typeof cb === 'function') {
process.nextTick(cb, err);
}

process.nextTick(emitErrorCloseNT, self, err);
});
}
} catch (err) {
process.nextTick(emitErrorNT, self, err);
}
}
}

function emitErrorCloseNT(self, err) {
Expand Down Expand Up @@ -230,7 +291,7 @@ function constructNT(stream) {
const s = w || r;

let called = false;
stream._construct((err) => {
const result = stream._construct((err) => {
if (r) {
r.constructed = true;
}
Expand All @@ -252,6 +313,47 @@ function constructNT(stream) {
process.nextTick(emitConstructNT, stream);
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
// If the callback was invoked, do nothing further.
if (called)
return;
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
if (s.destroyed) {
process.nextTick(() => stream.emit(kDestroy));
} else {
process.nextTick(emitConstructNT, stream);
}
},
function(err) {
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
called = true;
if (s.destroyed) {
process.nextTick(() => stream.emit(kDestroy, err));
} else {
process.nextTick(errorOrDestroy, stream, err);
}
});
}
} catch (err) {
process.nextTick(emitErrorNT, stream, err);
}
}
}

function emitConstructNT(stream) {
Expand Down
Loading