1616
1717import  static  com .google .common .base .Preconditions .checkNotNull ;
1818import  static  com .google .common .base .Preconditions .checkState ;
19+ import  static  java .lang .Math .min ;
1920
2021import  com .google .common .annotations .VisibleForTesting ;
2122import  com .google .common .base .Throwables ;
2223import  com .google .common .io .ByteStreams ;
2324import  com .google .devtools .build .lib .actions .ActionInput ;
2425import  com .google .devtools .build .lib .actions .ActionInputHelper ;
2526import  com .google .devtools .build .lib .actions .cache .VirtualActionInput ;
27+ import  com .google .devtools .build .lib .remote .zstd .ZstdCompressingInputStream ;
2628import  com .google .devtools .build .lib .vfs .Path ;
2729import  com .google .protobuf .ByteString ;
2830import  java .io .ByteArrayInputStream ;
29- import  java .io .EOFException ;
3031import  java .io .IOException ;
3132import  java .io .InputStream ;
33+ import  java .io .PushbackInputStream ;
3234import  java .util .NoSuchElementException ;
3335import  java .util .Objects ;
3436import  java .util .function .Supplier ;
@@ -55,6 +57,10 @@ static int getDefaultChunkSize() {
5557    return  defaultChunkSize ;
5658  }
5759
60+   public  boolean  isCompressed () {
61+     return  compressed ;
62+   }
63+ 
5864  /** A piece of a byte[] blob. */ 
5965  public  static  final  class  Chunk  {
6066
@@ -98,19 +104,22 @@ public int hashCode() {
98104  private  final  int  chunkSize ;
99105  private  final  Chunk  emptyChunk ;
100106
101-   private  InputStream  data ;
107+   private  ChunkerInputStream  data ;
102108  private  long  offset ;
103109  private  byte [] chunkCache ;
104110
111+   private  final  boolean  compressed ;
112+ 
105113  // Set to true on the first call to next(). This is so that the Chunker can open its data source 
106114  // lazily on the first call to next(), as opposed to opening it in the constructor or on reset(). 
107115  private  boolean  initialized ;
108116
109-   Chunker (Supplier <InputStream > dataSupplier , long  size , int  chunkSize ) {
117+   Chunker (Supplier <InputStream > dataSupplier , long  size , int  chunkSize ,  boolean   compressed ) {
110118    this .dataSupplier  = checkNotNull (dataSupplier );
111119    this .size  = size ;
112120    this .chunkSize  = chunkSize ;
113121    this .emptyChunk  = new  Chunk (ByteString .EMPTY , 0 );
122+     this .compressed  = compressed ;
114123  }
115124
116125  public  long  getOffset () {
@@ -127,13 +136,9 @@ public long getSize() {
127136   * <p>Closes any open resources (file handles, ...). 
128137   */ 
129138  public  void  reset () throws  IOException  {
130-     if  (data  != null ) {
131-       data .close ();
132-     }
133-     data  = null ;
139+     close ();
134140    offset  = 0 ;
135141    initialized  = false ;
136-     chunkCache  = null ;
137142  }
138143
139144  /** 
@@ -148,6 +153,9 @@ public void seek(long toOffset) throws IOException {
148153    maybeInitialize ();
149154    ByteStreams .skipFully (data , toOffset  - offset );
150155    offset  = toOffset ;
156+     if  (data .finished ()) {
157+       close ();
158+     }
151159  }
152160
153161  /** 
@@ -157,6 +165,27 @@ public boolean hasNext() {
157165    return  data  != null  || !initialized ;
158166  }
159167
168+   /** Closes the input stream and reset chunk cache */ 
169+   private  void  close () throws  IOException  {
170+     if  (data  != null ) {
171+       data .close ();
172+       data  = null ;
173+     }
174+     chunkCache  = null ;
175+   }
176+ 
177+   /** Attempts reading at most a full chunk and stores it in the chunkCache buffer */ 
178+   private  int  read () throws  IOException  {
179+     int  count  = 0 ;
180+     while  (count  < chunkCache .length ) {
181+       int  c  = data .read (chunkCache , count , chunkCache .length  - count );
182+       if  (c  < 0 ) {
183+         break ;
184+       }
185+       count  += c ;
186+     }
187+     return  count ;
188+   }
160189  /** 
161190   * Returns the next {@link Chunk} or throws a {@link NoSuchElementException} if no data is left. 
162191   * 
@@ -178,46 +207,40 @@ public Chunk next() throws IOException {
178207      return  emptyChunk ;
179208    }
180209
181-     // The cast to int is safe, because the return value is capped at chunkSize. 
182-     int  bytesToRead  = (int ) Math .min (bytesLeft (), chunkSize );
183-     if  (bytesToRead  == 0 ) {
210+     if  (data .finished ()) {
184211      chunkCache  = null ;
185212      data  = null ;
186213      throw  new  NoSuchElementException ();
187214    }
188215
189216    if  (chunkCache  == null ) {
217+       // If the output is compressed we can't know how many bytes there are yet to read, 
218+       // so we allocate the whole chunkSize, otherwise we try to compute the smallest possible value 
219+       // The cast to int is safe, because the return value is capped at chunkSize. 
220+       int  cacheSize  = compressed  ? chunkSize  : (int ) min (getSize () - getOffset (), chunkSize );
190221      // Lazily allocate it in order to save memory on small data. 
191222      // 1) bytesToRead < chunkSize: There will only ever be one next() call. 
192223      // 2) bytesToRead == chunkSize: chunkCache will be set to its biggest possible value. 
193224      // 3) bytestoRead > chunkSize: Not possible, due to Math.min above. 
194-       chunkCache  = new  byte [bytesToRead ];
225+       chunkCache  = new  byte [cacheSize ];
195226    }
196227
197228    long  offsetBefore  = offset ;
198-     try  {
199-       ByteStreams .readFully (data , chunkCache , 0 , bytesToRead );
200-     } catch  (EOFException  e ) {
201-       throw  new  IllegalStateException ("Reached EOF, but expected " 
202-           + bytesToRead  + " bytes." , e );
203-     }
204-     offset  += bytesToRead ;
205229
206-     ByteString   blob  = ByteString . copyFrom ( chunkCache ,  0 ,  bytesToRead );
230+     int   bytesRead  = read ( );
207231
208-     if  (bytesLeft () == 0 ) {
209-       data .close ();
210-       data  = null ;
211-       chunkCache  = null ;
232+     ByteString  blob  = ByteString .copyFrom (chunkCache , 0 , bytesRead );
233+ 
234+     // This has to happen after actualSize has been updated 
235+     // or the guard in getActualSize won't work. 
236+     offset  += bytesRead ;
237+     if  (data .finished ()) {
238+       close ();
212239    }
213240
214241    return  new  Chunk (blob , offsetBefore );
215242  }
216243
217-   public  long  bytesLeft () {
218-     return  getSize () - getOffset ();
219-   }
220- 
221244  private  void  maybeInitialize () throws  IOException  {
222245    if  (initialized ) {
223246      return ;
@@ -226,7 +249,10 @@ private void maybeInitialize() throws IOException {
226249    checkState (offset  == 0 );
227250    checkState (chunkCache  == null );
228251    try  {
229-       data  = dataSupplier .get ();
252+       data  =
253+           compressed 
254+               ? new  ChunkerInputStream (new  ZstdCompressingInputStream (dataSupplier .get ()))
255+               : new  ChunkerInputStream (dataSupplier .get ());
230256    } catch  (RuntimeException  e ) {
231257      Throwables .propagateIfPossible (e .getCause (), IOException .class );
232258      throw  e ;
@@ -242,6 +268,7 @@ public static Builder builder() {
242268  public  static  class  Builder  {
243269    private  int  chunkSize  = getDefaultChunkSize ();
244270    private  long  size ;
271+     private  boolean  compressed ;
245272    private  Supplier <InputStream > inputStream ;
246273
247274    public  Builder  setInput (byte [] data ) {
@@ -251,6 +278,11 @@ public Builder setInput(byte[] data) {
251278      return  this ;
252279    }
253280
281+     public  Builder  setCompressed (boolean  compressed ) {
282+       this .compressed  = compressed ;
283+       return  this ;
284+     }
285+ 
254286    public  Builder  setInput (long  size , InputStream  in ) {
255287      checkState (inputStream  == null );
256288      checkNotNull (in );
@@ -305,7 +337,22 @@ public Builder setChunkSize(int chunkSize) {
305337
306338    public  Chunker  build () {
307339      checkNotNull (inputStream );
308-       return  new  Chunker (inputStream , size , chunkSize );
340+       return  new  Chunker (inputStream , size , chunkSize , compressed );
341+     }
342+   }
343+ 
344+   static  class  ChunkerInputStream  extends  PushbackInputStream  {
345+     ChunkerInputStream (InputStream  in ) {
346+       super (in );
347+     }
348+ 
349+     public  boolean  finished () throws  IOException  {
350+       int  c  = super .read ();
351+       if  (c  == -1 ) {
352+         return  true ;
353+       }
354+       super .unread (c );
355+       return  false ;
309356    }
310357  }
311358}
0 commit comments