11'use strict'
22
33const { pipeline } = require ( './pipeline' )
4-
54const Duplex = require ( './duplex' )
6-
75const { destroyer } = require ( './destroy' )
8-
96const { isNodeStream, isReadable, isWritable } = require ( './utils' )
10-
117const {
128 AbortError,
139 codes : { ERR_INVALID_ARG_VALUE , ERR_MISSING_ARGS }
1410} = require ( '../../ours/errors' )
15-
1611module . exports = function compose ( ...streams ) {
1712 if ( streams . length === 0 ) {
1813 throw new ERR_MISSING_ARGS ( 'streams' )
1914 }
20-
2115 if ( streams . length === 1 ) {
2216 return Duplex . from ( streams [ 0 ] )
2317 }
24-
2518 const orgStreams = [ ...streams ]
26-
2719 if ( typeof streams [ 0 ] === 'function' ) {
2820 streams [ 0 ] = Duplex . from ( streams [ 0 ] )
2921 }
30-
3122 if ( typeof streams [ streams . length - 1 ] === 'function' ) {
3223 const idx = streams . length - 1
3324 streams [ idx ] = Duplex . from ( streams [ idx ] )
3425 }
35-
3626 for ( let n = 0 ; n < streams . length ; ++ n ) {
3727 if ( ! isNodeStream ( streams [ n ] ) ) {
3828 // TODO(ronag): Add checks for non streams.
3929 continue
4030 }
41-
4231 if ( n < streams . length - 1 && ! isReadable ( streams [ n ] ) ) {
4332 throw new ERR_INVALID_ARG_VALUE ( `streams[${ n } ]` , orgStreams [ n ] , 'must be readable' )
4433 }
45-
4634 if ( n > 0 && ! isWritable ( streams [ n ] ) ) {
4735 throw new ERR_INVALID_ARG_VALUE ( `streams[${ n } ]` , orgStreams [ n ] , 'must be writable' )
4836 }
4937 }
50-
5138 let ondrain
5239 let onfinish
5340 let onreadable
5441 let onclose
5542 let d
56-
5743 function onfinished ( err ) {
5844 const cb = onclose
5945 onclose = null
60-
6146 if ( cb ) {
6247 cb ( err )
6348 } else if ( err ) {
@@ -66,22 +51,21 @@ module.exports = function compose(...streams) {
6651 d . destroy ( )
6752 }
6853 }
69-
7054 const head = streams [ 0 ]
7155 const tail = pipeline ( streams , onfinished )
7256 const writable = ! ! isWritable ( head )
73- const readable = ! ! isReadable ( tail ) // TODO(ronag): Avoid double buffering.
57+ const readable = ! ! isReadable ( tail )
58+
59+ // TODO(ronag): Avoid double buffering.
7460 // Implement Writable/Readable/Duplex traits.
7561 // See, https://github.com/nodejs/node/pull/33515.
76-
7762 d = new Duplex ( {
7863 // TODO (ronag): highWaterMark?
7964 writableObjectMode : ! ! ( head !== null && head !== undefined && head . writableObjectMode ) ,
8065 readableObjectMode : ! ! ( tail !== null && tail !== undefined && tail . writableObjectMode ) ,
8166 writable,
8267 readable
8368 } )
84-
8569 if ( writable ) {
8670 d . _write = function ( chunk , encoding , callback ) {
8771 if ( head . write ( chunk , encoding ) ) {
@@ -90,12 +74,10 @@ module.exports = function compose(...streams) {
9074 ondrain = callback
9175 }
9276 }
93-
9477 d . _final = function ( callback ) {
9578 head . end ( )
9679 onfinish = callback
9780 }
98-
9981 head . on ( 'drain' , function ( ) {
10082 if ( ondrain ) {
10183 const cb = ondrain
@@ -111,7 +93,6 @@ module.exports = function compose(...streams) {
11193 }
11294 } )
11395 }
114-
11596 if ( readable ) {
11697 tail . on ( 'readable' , function ( ) {
11798 if ( onreadable ) {
@@ -123,39 +104,32 @@ module.exports = function compose(...streams) {
123104 tail . on ( 'end' , function ( ) {
124105 d . push ( null )
125106 } )
126-
127107 d . _read = function ( ) {
128108 while ( true ) {
129109 const buf = tail . read ( )
130-
131110 if ( buf === null ) {
132111 onreadable = d . _read
133112 return
134113 }
135-
136114 if ( ! d . push ( buf ) ) {
137115 return
138116 }
139117 }
140118 }
141119 }
142-
143120 d . _destroy = function ( err , callback ) {
144121 if ( ! err && onclose !== null ) {
145122 err = new AbortError ( )
146123 }
147-
148124 onreadable = null
149125 ondrain = null
150126 onfinish = null
151-
152127 if ( onclose === null ) {
153128 callback ( err )
154129 } else {
155130 onclose = callback
156131 destroyer ( tail , err )
157132 }
158133 }
159-
160134 return d
161135}
0 commit comments