|
1 | 1 | 'use strict'; |
2 | 2 |
|
| 3 | +const finished = require('internal/streams/end-of-stream'); |
| 4 | + |
3 | 5 | const kLastResolve = Symbol('lastResolve'); |
4 | 6 | const kLastReject = Symbol('lastReject'); |
5 | 7 | const kError = Symbol('error'); |
@@ -34,30 +36,6 @@ function onReadable(iter) { |
34 | 36 | process.nextTick(readAndResolve, iter); |
35 | 37 | } |
36 | 38 |
|
37 | | -function onEnd(iter) { |
38 | | - const resolve = iter[kLastResolve]; |
39 | | - if (resolve !== null) { |
40 | | - iter[kLastPromise] = null; |
41 | | - iter[kLastResolve] = null; |
42 | | - iter[kLastReject] = null; |
43 | | - resolve(createIterResult(null, true)); |
44 | | - } |
45 | | - iter[kEnded] = true; |
46 | | -} |
47 | | - |
48 | | -function onError(iter, err) { |
49 | | - const reject = iter[kLastReject]; |
50 | | - // reject if we are waiting for data in the Promise |
51 | | - // returned by next() and store the error |
52 | | - if (reject !== null) { |
53 | | - iter[kLastPromise] = null; |
54 | | - iter[kLastResolve] = null; |
55 | | - iter[kLastReject] = null; |
56 | | - reject(err); |
57 | | - } |
58 | | - iter[kError] = err; |
59 | | -} |
60 | | - |
61 | 39 | function wrapForNext(lastPromise, iter) { |
62 | 40 | return function(resolve, reject) { |
63 | 41 | lastPromise.then(function() { |
@@ -86,6 +64,22 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ |
86 | 64 | return Promise.resolve(createIterResult(null, true)); |
87 | 65 | } |
88 | 66 |
|
| 67 | + if (this[kStream].destroyed) { |
| 68 | + // We need to defer via nextTick because if .destroy(err) is |
| 69 | + // called, the error will be emitted via nextTick, and |
| 70 | + // we cannot guarantee that there is no error lingering around |
| 71 | + // waiting to be emitted. |
| 72 | + return new Promise((resolve, reject) => { |
| 73 | + process.nextTick(() => { |
| 74 | + if (this[kError]) { |
| 75 | + reject(this[kError]); |
| 76 | + } else { |
| 77 | + resolve(createIterResult(null, true)); |
| 78 | + } |
| 79 | + }); |
| 80 | + }); |
| 81 | + } |
| 82 | + |
89 | 83 | // if we have multiple next() calls |
90 | 84 | // we will wait for the previous Promise to finish |
91 | 85 | // this logic is optimized to support for await loops, |
@@ -155,9 +149,32 @@ const createReadableStreamAsyncIterator = (stream) => { |
155 | 149 | }, |
156 | 150 | }); |
157 | 151 |
|
| 152 | + finished(stream, (err) => { |
| 153 | + if (err) { |
| 154 | + const reject = iterator[kLastReject]; |
| 155 | + // reject if we are waiting for data in the Promise |
| 156 | + // returned by next() and store the error |
| 157 | + if (reject !== null) { |
| 158 | + iterator[kLastPromise] = null; |
| 159 | + iterator[kLastResolve] = null; |
| 160 | + iterator[kLastReject] = null; |
| 161 | + reject(err); |
| 162 | + } |
| 163 | + iterator[kError] = err; |
| 164 | + return; |
| 165 | + } |
| 166 | + |
| 167 | + const resolve = iterator[kLastResolve]; |
| 168 | + if (resolve !== null) { |
| 169 | + iterator[kLastPromise] = null; |
| 170 | + iterator[kLastResolve] = null; |
| 171 | + iterator[kLastReject] = null; |
| 172 | + resolve(createIterResult(null, true)); |
| 173 | + } |
| 174 | + iterator[kEnded] = true; |
| 175 | + }); |
| 176 | + |
158 | 177 | stream.on('readable', onReadable.bind(null, iterator)); |
159 | | - stream.on('end', onEnd.bind(null, iterator)); |
160 | | - stream.on('error', onError.bind(null, iterator)); |
161 | 178 |
|
162 | 179 | return iterator; |
163 | 180 | }; |
|
0 commit comments