Skip to content

Commit 3ec079e

Browse files
authored
Build from Node v10.15.3 (#402)
1 parent 5b90ed2 commit 3ec079e

15 files changed

+588
-225
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ npm install --save readable-stream
1515

1616
This package is a mirror of the streams implementations in Node.js.
1717

18-
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.15.2/docs/api/stream.html).
18+
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.15.3/docs/api/stream.html).
1919

2020
If you want to guarantee a stable streams base, regardless of what version of
2121
Node you, or the users of your libraries are using, use **readable-stream** *only* and avoid the *"stream"* module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html).

build/build.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ pump(
146146
file !== 'test-stream-base-prototype-accessors.js' &&
147147
file !== 'test-stream-base-prototype-accessors-enumerability.js' &&
148148
file !== 'test-stream-wrap-drain.js' &&
149+
file !== 'test-stream-pipeline-http2.js' &&
149150
file !== 'test-stream-base-typechecking.js') {
150151
processTestFile(file)
151152
}

build/files.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,17 @@ module.exports['internal/streams/async_iterator.js'] = [
337337

338338
module.exports['internal/streams/end-of-stream.js'] = [
339339
, errorsTwoLevel
340+
, [
341+
/const \{ once \} = require\('internal\/util'\);/,
342+
`function once(callback) {
343+
let called = false;
344+
return function(...args) {
345+
if (called) return;
346+
called = true;
347+
callback.apply(this, args);
348+
};
349+
}`
350+
]
340351
]
341352

342353
module.exports['internal/streams/pipeline.js'] = [

lib/_stream_readable.js

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -544,13 +544,35 @@ function maybeReadMore(stream, state) {
544544
}
545545

546546
function maybeReadMore_(stream, state) {
547-
var len = state.length;
548-
549-
while (!state.reading && !state.ended && state.length < state.highWaterMark) {
547+
// Attempt to read more data if we should.
548+
//
549+
// The conditions for reading more data are (one of):
550+
// - Not enough data buffered (state.length < state.highWaterMark). The loop
551+
// is responsible for filling the buffer with enough data if such data
552+
// is available. If highWaterMark is 0 and we are not in the flowing mode
553+
// we should _not_ attempt to buffer any extra data. We'll get more data
554+
// when the stream consumer calls read() instead.
555+
// - No data in the buffer, and the stream is in flowing mode. In this mode
556+
// the loop below is responsible for ensuring read() is called. Failing to
557+
// call read here would abort the flow and there's no other mechanism for
558+
// continuing the flow if the stream consumer has just subscribed to the
559+
// 'data' event.
560+
//
561+
// In addition to the above conditions to keep reading data, the following
562+
// conditions prevent the data from being read:
563+
// - The stream has ended (state.ended).
564+
// - There is already a pending 'read' operation (state.reading). This is a
565+
// case where the the stream has called the implementation defined _read()
566+
// method, but they are processing the call asynchronously and have _not_
567+
// called push() with new data. In this case we skip performing more
568+
// read()s. The execution ends in this method again after the _read() ends
569+
// up calling push() with more data.
570+
while (!state.reading && !state.ended && (state.length < state.highWaterMark || state.flowing && state.length === 0)) {
571+
var len = state.length;
550572
debug('maybeReadMore read 0');
551573
stream.read(0);
552574
if (len === state.length) // didn't get any data, stop spinning.
553-
break;else len = state.length;
575+
break;
554576
}
555577

556578
state.readingMore = false;

lib/internal/streams/async_iterator.js

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ function onReadable(iter) {
4747
function wrapForNext(lastPromise, iter) {
4848
return function (resolve, reject) {
4949
lastPromise.then(function () {
50+
if (iter[kEnded]) {
51+
resolve(createIterResult(undefined, true));
52+
return;
53+
}
54+
5055
iter[kHandlePromise](resolve, reject);
5156
}, reject);
5257
};
@@ -70,7 +75,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro
7075
}
7176

7277
if (this[kEnded]) {
73-
return Promise.resolve(createIterResult(null, true));
78+
return Promise.resolve(createIterResult(undefined, true));
7479
}
7580

7681
if (this[kStream].destroyed) {
@@ -83,7 +88,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro
8388
if (_this[kError]) {
8489
reject(_this[kError]);
8590
} else {
86-
resolve(createIterResult(null, true));
91+
resolve(createIterResult(undefined, true));
8792
}
8893
});
8994
});
@@ -128,7 +133,7 @@ var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPro
128133
return;
129134
}
130135

131-
resolve(createIterResult(null, true));
136+
resolve(createIterResult(undefined, true));
132137
});
133138
});
134139
}), _Object$setPrototypeO), AsyncIteratorPrototype);
@@ -151,9 +156,6 @@ var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterat
151156
}), _defineProperty(_Object$create, kEnded, {
152157
value: stream._readableState.endEmitted,
153158
writable: true
154-
}), _defineProperty(_Object$create, kLastPromise, {
155-
value: null,
156-
writable: true
157159
}), _defineProperty(_Object$create, kHandlePromise, {
158160
value: function value(resolve, reject) {
159161
var data = iterator[kStream].read();
@@ -170,6 +172,7 @@ var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterat
170172
},
171173
writable: true
172174
}), _Object$create));
175+
iterator[kLastPromise] = null;
173176
finished(stream, function (err) {
174177
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
175178
var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise
@@ -192,7 +195,7 @@ var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterat
192195
iterator[kLastPromise] = null;
193196
iterator[kLastResolve] = null;
194197
iterator[kLastReject] = null;
195-
resolve(createIterResult(null, true));
198+
resolve(createIterResult(undefined, true));
196199
}
197200

198201
iterator[kEnded] = true;

lib/internal/streams/end-of-stream.js

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,41 +4,50 @@
44

55
var ERR_STREAM_PREMATURE_CLOSE = require('../../../errors').codes.ERR_STREAM_PREMATURE_CLOSE;
66

7-
function noop() {}
8-
9-
function isRequest(stream) {
10-
return stream.setHeader && typeof stream.abort === 'function';
11-
}
12-
137
function once(callback) {
148
var called = false;
15-
return function (err) {
9+
return function () {
1610
if (called) return;
1711
called = true;
18-
callback.call(this, err);
12+
13+
for (var _len = arguments.length, args = new Array(_len), _key = 0; _key < _len; _key++) {
14+
args[_key] = arguments[_key];
15+
}
16+
17+
callback.apply(this, args);
1918
};
2019
}
2120

21+
function noop() {}
22+
23+
function isRequest(stream) {
24+
return stream.setHeader && typeof stream.abort === 'function';
25+
}
26+
2227
function eos(stream, opts, callback) {
2328
if (typeof opts === 'function') return eos(stream, null, opts);
2429
if (!opts) opts = {};
2530
callback = once(callback || noop);
26-
var ws = stream._writableState;
27-
var rs = stream._readableState;
2831
var readable = opts.readable || opts.readable !== false && stream.readable;
2932
var writable = opts.writable || opts.writable !== false && stream.writable;
3033

3134
var onlegacyfinish = function onlegacyfinish() {
3235
if (!stream.writable) onfinish();
3336
};
3437

38+
var writableEnded = stream._writableState && stream._writableState.finished;
39+
3540
var onfinish = function onfinish() {
3641
writable = false;
42+
writableEnded = true;
3743
if (!readable) callback.call(stream);
3844
};
3945

46+
var readableEnded = stream._readableState && stream._readableState.endEmitted;
47+
4048
var onend = function onend() {
4149
readable = false;
50+
readableEnded = true;
4251
if (!writable) callback.call(stream);
4352
};
4453

@@ -47,12 +56,16 @@ function eos(stream, opts, callback) {
4756
};
4857

4958
var onclose = function onclose() {
50-
if (readable && !(rs && rs.ended)) {
51-
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
59+
var err;
60+
61+
if (readable && !readableEnded) {
62+
if (!stream._readableState || !stream._readableState.ended) err = new ERR_STREAM_PREMATURE_CLOSE();
63+
return callback.call(stream, err);
5264
}
5365

54-
if (writable && !(ws && ws.ended)) {
55-
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
66+
if (writable && !writableEnded) {
67+
if (!stream._writableState || !stream._writableState.ended) err = new ERR_STREAM_PREMATURE_CLOSE();
68+
return callback.call(stream, err);
5669
}
5770
};
5871

@@ -64,7 +77,7 @@ function eos(stream, opts, callback) {
6477
stream.on('complete', onfinish);
6578
stream.on('abort', onclose);
6679
if (stream.req) onrequest();else stream.on('request', onrequest);
67-
} else if (writable && !ws) {
80+
} else if (writable && !stream._writableState) {
6881
// legacy streams
6982
stream.on('end', onlegacyfinish);
7083
stream.on('close', onlegacyfinish);
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"use strict";
2+
3+
/*<replacement>*/
4+
var bufferShim = require('safe-buffer').Buffer;
5+
/*</replacement>*/
6+
7+
8+
var common = require('../common');
9+
10+
var assert = require('assert/');
11+
12+
var _require = require('../../'),
13+
Readable = _require.Readable,
14+
Duplex = _require.Duplex,
15+
pipeline = _require.pipeline; // Test that the callback for pipeline() is called even when the ._destroy()
16+
// method of the stream places an .end() request to itself that does not
17+
// get processed before the destruction of the stream (i.e. the 'close' event).
18+
// Refs: https://github.com/nodejs/node/issues/24456
19+
20+
21+
var readable = new Readable({
22+
read: common.mustCall(function () {})
23+
});
24+
var duplex = new Duplex({
25+
write: function write(chunk, enc, cb) {// Simulate messages queueing up.
26+
},
27+
read: function read() {},
28+
destroy: function destroy(err, cb) {
29+
// Call end() from inside the destroy() method, like HTTP/2 streams
30+
// do at the time of writing.
31+
this.end();
32+
cb(err);
33+
}
34+
});
35+
duplex.on('finished', common.mustNotCall());
36+
pipeline(readable, duplex, common.mustCall(function (err) {
37+
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
38+
})); // Write one chunk of data, and destroy the stream later.
39+
// That should trigger the pipeline destruction.
40+
41+
readable.push('foo');
42+
setImmediate(function () {
43+
readable.destroy();
44+
});
45+
;
46+
47+
require('tap').pass('sync run');
48+
49+
var _list = process.listeners('uncaughtException');
50+
51+
process.removeAllListeners('uncaughtException');
52+
53+
_list.pop();
54+
55+
_list.forEach(function (e) {
56+
return process.on('uncaughtException', e);
57+
});

test/parallel/test-stream-pipeline.js

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ var bufferShim = require('safe-buffer').Buffer;
1111

1212
var common = require('../common');
1313

14-
if (!common.hasCrypto) common.skip('missing crypto');
15-
1614
var _require = require('../../'),
1715
Stream = _require.Stream,
1816
Writable = _require.Writable,
@@ -24,14 +22,6 @@ var assert = require('assert/');
2422

2523
var http = require('http');
2624

27-
var http2 = {
28-
createServer: function createServer() {
29-
return {
30-
listen: function listen() {}
31-
};
32-
}
33-
};
34-
3525
var promisify = require('util-promisify');
3626

3727
{
@@ -281,34 +271,6 @@ var promisify = require('util-promisify');
281271
});
282272
});
283273
}
284-
{
285-
var _server4 = http2.createServer(function (req, res) {
286-
pipeline(req, res, common.mustCall());
287-
});
288-
289-
_server4.listen(0, function () {
290-
var url = "http://localhost:".concat(_server4.address().port);
291-
var client = http2.connect(url);
292-
var req = client.request({
293-
':method': 'POST'
294-
});
295-
var rs = new Readable({
296-
read: function read() {
297-
rs.push('hello');
298-
}
299-
});
300-
pipeline(rs, req, common.mustCall(function (err) {
301-
_server4.close();
302-
303-
client.close();
304-
}));
305-
var cnt = 10;
306-
req.on('data', function (data) {
307-
cnt--;
308-
if (cnt === 0) rs.destroy();
309-
});
310-
});
311-
}
312274
{
313275
var makeTransform = function makeTransform() {
314276
var tr = new Transform({

0 commit comments

Comments
 (0)