1616
1717import static com .google .common .base .Preconditions .checkNotNull ;
1818import static com .google .common .base .Preconditions .checkState ;
19+ import static java .lang .Math .min ;
1920
2021import com .google .common .annotations .VisibleForTesting ;
2122import com .google .common .base .Throwables ;
2223import com .google .common .io .ByteStreams ;
2324import com .google .devtools .build .lib .actions .ActionInput ;
2425import com .google .devtools .build .lib .actions .ActionInputHelper ;
2526import com .google .devtools .build .lib .actions .cache .VirtualActionInput ;
27+ import com .google .devtools .build .lib .remote .zstd .ZstdCompressingInputStream ;
2628import com .google .devtools .build .lib .vfs .Path ;
2729import com .google .protobuf .ByteString ;
2830import java .io .ByteArrayInputStream ;
29- import java .io .EOFException ;
3031import java .io .IOException ;
3132import java .io .InputStream ;
33+ import java .io .PushbackInputStream ;
3234import java .util .NoSuchElementException ;
3335import java .util .Objects ;
3436import java .util .function .Supplier ;
@@ -55,6 +57,10 @@ static int getDefaultChunkSize() {
5557 return defaultChunkSize ;
5658 }
5759
60+ public boolean isCompressed () {
61+ return compressed ;
62+ }
63+
5864 /** A piece of a byte[] blob. */
5965 public static final class Chunk {
6066
@@ -98,19 +104,22 @@ public int hashCode() {
98104 private final int chunkSize ;
99105 private final Chunk emptyChunk ;
100106
101- private InputStream data ;
107+ private ChunkerInputStream data ;
102108 private long offset ;
103109 private byte [] chunkCache ;
104110
111+ private final boolean compressed ;
112+
105113 // Set to true on the first call to next(). This is so that the Chunker can open its data source
106114 // lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
107115 private boolean initialized ;
108116
109- Chunker (Supplier <InputStream > dataSupplier , long size , int chunkSize ) {
117+ Chunker (Supplier <InputStream > dataSupplier , long size , int chunkSize , boolean compressed ) {
110118 this .dataSupplier = checkNotNull (dataSupplier );
111119 this .size = size ;
112120 this .chunkSize = chunkSize ;
113121 this .emptyChunk = new Chunk (ByteString .EMPTY , 0 );
122+ this .compressed = compressed ;
114123 }
115124
116125 public long getOffset () {
@@ -127,13 +136,9 @@ public long getSize() {
127136 * <p>Closes any open resources (file handles, ...).
128137 */
129138 public void reset () throws IOException {
130- if (data != null ) {
131- data .close ();
132- }
133- data = null ;
139+ close ();
134140 offset = 0 ;
135141 initialized = false ;
136- chunkCache = null ;
137142 }
138143
139144 /**
@@ -148,6 +153,9 @@ public void seek(long toOffset) throws IOException {
148153 maybeInitialize ();
149154 ByteStreams .skipFully (data , toOffset - offset );
150155 offset = toOffset ;
156+ if (data .finished ()) {
157+ close ();
158+ }
151159 }
152160
153161 /**
@@ -157,6 +165,27 @@ public boolean hasNext() {
157165 return data != null || !initialized ;
158166 }
159167
168+ /** Closes the input stream and reset chunk cache */
169+ private void close () throws IOException {
170+ if (data != null ) {
171+ data .close ();
172+ data = null ;
173+ }
174+ chunkCache = null ;
175+ }
176+
177+ /** Attempts reading at most a full chunk and stores it in the chunkCache buffer */
178+ private int read () throws IOException {
179+ int count = 0 ;
180+ while (count < chunkCache .length ) {
181+ int c = data .read (chunkCache , count , chunkCache .length - count );
182+ if (c < 0 ) {
183+ break ;
184+ }
185+ count += c ;
186+ }
187+ return count ;
188+ }
160189 /**
161190 * Returns the next {@link Chunk} or throws a {@link NoSuchElementException} if no data is left.
162191 *
@@ -178,46 +207,40 @@ public Chunk next() throws IOException {
178207 return emptyChunk ;
179208 }
180209
181- // The cast to int is safe, because the return value is capped at chunkSize.
182- int bytesToRead = (int ) Math .min (bytesLeft (), chunkSize );
183- if (bytesToRead == 0 ) {
210+ if (data .finished ()) {
184211 chunkCache = null ;
185212 data = null ;
186213 throw new NoSuchElementException ();
187214 }
188215
189216 if (chunkCache == null ) {
217+ // If the output is compressed we can't know how many bytes there are yet to read,
218+ // so we allocate the whole chunkSize, otherwise we try to compute the smallest possible value
219+ // The cast to int is safe, because the return value is capped at chunkSize.
220+ int cacheSize = compressed ? chunkSize : (int ) min (getSize () - getOffset (), chunkSize );
190221 // Lazily allocate it in order to save memory on small data.
191222 // 1) bytesToRead < chunkSize: There will only ever be one next() call.
192223 // 2) bytesToRead == chunkSize: chunkCache will be set to its biggest possible value.
193224 // 3) bytestoRead > chunkSize: Not possible, due to Math.min above.
194- chunkCache = new byte [bytesToRead ];
225+ chunkCache = new byte [cacheSize ];
195226 }
196227
197228 long offsetBefore = offset ;
198- try {
199- ByteStreams .readFully (data , chunkCache , 0 , bytesToRead );
200- } catch (EOFException e ) {
201- throw new IllegalStateException ("Reached EOF, but expected "
202- + bytesToRead + " bytes." , e );
203- }
204- offset += bytesToRead ;
205229
206- ByteString blob = ByteString . copyFrom ( chunkCache , 0 , bytesToRead );
230+ int bytesRead = read ( );
207231
208- if (bytesLeft () == 0 ) {
209- data .close ();
210- data = null ;
211- chunkCache = null ;
232+ ByteString blob = ByteString .copyFrom (chunkCache , 0 , bytesRead );
233+
234+ // This has to happen after actualSize has been updated
235+ // or the guard in getActualSize won't work.
236+ offset += bytesRead ;
237+ if (data .finished ()) {
238+ close ();
212239 }
213240
214241 return new Chunk (blob , offsetBefore );
215242 }
216243
217- public long bytesLeft () {
218- return getSize () - getOffset ();
219- }
220-
221244 private void maybeInitialize () throws IOException {
222245 if (initialized ) {
223246 return ;
@@ -226,7 +249,10 @@ private void maybeInitialize() throws IOException {
226249 checkState (offset == 0 );
227250 checkState (chunkCache == null );
228251 try {
229- data = dataSupplier .get ();
252+ data =
253+ compressed
254+ ? new ChunkerInputStream (new ZstdCompressingInputStream (dataSupplier .get ()))
255+ : new ChunkerInputStream (dataSupplier .get ());
230256 } catch (RuntimeException e ) {
231257 Throwables .propagateIfPossible (e .getCause (), IOException .class );
232258 throw e ;
@@ -242,6 +268,7 @@ public static Builder builder() {
242268 public static class Builder {
243269 private int chunkSize = getDefaultChunkSize ();
244270 private long size ;
271+ private boolean compressed ;
245272 private Supplier <InputStream > inputStream ;
246273
247274 public Builder setInput (byte [] data ) {
@@ -251,6 +278,11 @@ public Builder setInput(byte[] data) {
251278 return this ;
252279 }
253280
281+ public Builder setCompressed (boolean compressed ) {
282+ this .compressed = compressed ;
283+ return this ;
284+ }
285+
254286 public Builder setInput (long size , InputStream in ) {
255287 checkState (inputStream == null );
256288 checkNotNull (in );
@@ -305,7 +337,22 @@ public Builder setChunkSize(int chunkSize) {
305337
306338 public Chunker build () {
307339 checkNotNull (inputStream );
308- return new Chunker (inputStream , size , chunkSize );
340+ return new Chunker (inputStream , size , chunkSize , compressed );
341+ }
342+ }
343+
344+ static class ChunkerInputStream extends PushbackInputStream {
345+ ChunkerInputStream (InputStream in ) {
346+ super (in );
347+ }
348+
349+ public boolean finished () throws IOException {
350+ int c = super .read ();
351+ if (c == -1 ) {
352+ return true ;
353+ }
354+ super .unread (c );
355+ return false ;
309356 }
310357 }
311358}
0 commit comments