1313// limitations under the License. 
1414package  com .google .devtools .build .lib .remote ;
1515
16- import  static  com .google .common .base .Preconditions .checkArgument ;
17- import  static  com .google .common .base .Preconditions .checkState ;
16+ import  static  com .google .common .collect .ImmutableList .toImmutableList ;
17+ import  static  com .google .common .util .concurrent .Futures .immediateFailedFuture ;
18+ import  static  com .google .common .util .concurrent .Futures .immediateFuture ;
1819import  static  com .google .common .util .concurrent .MoreExecutors .directExecutor ;
1920import  static  com .google .devtools .build .lib .remote .util .RxFutures .toCompletable ;
2021import  static  com .google .devtools .build .lib .remote .util .RxFutures .toSingle ;
2526import  build .bazel .remote .execution .v2 .Digest ;
2627import  build .bazel .remote .execution .v2 .Directory ;
2728import  com .google .common .base .Throwables ;
29+ import  com .google .common .collect .ImmutableList ;
2830import  com .google .common .collect .ImmutableSet ;
29- import  com .google .common .util .concurrent .Futures ;
3031import  com .google .common .util .concurrent .ListenableFuture ;
32+ import  com .google .devtools .build .lib .profiler .Profiler ;
33+ import  com .google .devtools .build .lib .profiler .SilentCloseable ;
3134import  com .google .devtools .build .lib .remote .common .RemoteActionExecutionContext ;
3235import  com .google .devtools .build .lib .remote .common .RemoteCacheClient ;
3336import  com .google .devtools .build .lib .remote .merkletree .MerkleTree ;
3639import  com .google .devtools .build .lib .remote .util .DigestUtil ;
3740import  com .google .devtools .build .lib .remote .util .RxUtils .TransferResult ;
3841import  com .google .protobuf .Message ;
42+ import  io .reactivex .rxjava3 .annotations .NonNull ;
3943import  io .reactivex .rxjava3 .core .Completable ;
44+ import  io .reactivex .rxjava3 .core .CompletableObserver ;
4045import  io .reactivex .rxjava3 .core .Flowable ;
46+ import  io .reactivex .rxjava3 .core .Maybe ;
47+ import  io .reactivex .rxjava3 .core .Observable ;
4148import  io .reactivex .rxjava3 .core .Single ;
49+ import  io .reactivex .rxjava3 .core .SingleEmitter ;
50+ import  io .reactivex .rxjava3 .disposables .Disposable ;
4251import  io .reactivex .rxjava3 .subjects .AsyncSubject ;
4352import  java .io .IOException ;
44- import  java .util .HashSet ;
53+ import  java .util .List ;
4554import  java .util .Map ;
46- import  java .util .Set ;
47- import  java .util .concurrent .atomic .AtomicBoolean ;
48- import  javax .annotation .concurrent .GuardedBy ;
55+ import  java .util .concurrent .atomic .AtomicReference ;
4956
5057/** A {@link RemoteCache} with additional functionality needed for remote execution. */ 
5158public  class  RemoteExecutionCache  extends  RemoteCache  {
@@ -85,13 +92,10 @@ public void ensureInputsPresent(
8592      return ;
8693    }
8794
88-     MissingDigestFinder  missingDigestFinder  = new  MissingDigestFinder (context , allDigests .size ());
8995    Flowable <TransferResult > uploads  =
90-         Flowable .fromIterable (allDigests )
91-             .flatMapSingle (
92-                 digest  ->
93-                     uploadBlobIfMissing (
94-                         context , merkleTree , additionalInputs , force , missingDigestFinder , digest ));
96+         createUploadTasks (context , merkleTree , additionalInputs , allDigests , force )
97+             .flatMap (uploadTasks  -> findMissingBlobs (context , uploadTasks ))
98+             .flatMapPublisher (this ::waitForUploadTasks );
9599
96100    try  {
97101      mergeBulkTransfer (uploads ).blockingAwait ();
@@ -105,36 +109,6 @@ public void ensureInputsPresent(
105109    }
106110  }
107111
108-   private  Single <TransferResult > uploadBlobIfMissing (
109-       RemoteActionExecutionContext  context ,
110-       MerkleTree  merkleTree ,
111-       Map <Digest , Message > additionalInputs ,
112-       boolean  force ,
113-       MissingDigestFinder  missingDigestFinder ,
114-       Digest  digest ) {
115-     Completable  upload  =
116-         casUploadCache .execute (
117-             digest ,
118-             Completable .defer (
119-                 () ->
120-                     // Only reach here if the digest is missing and is not being uploaded. 
121-                     missingDigestFinder 
122-                         .registerAndCount (digest )
123-                         .flatMapCompletable (
124-                             missingDigests  -> {
125-                               if  (missingDigests .contains (digest )) {
126-                                 return  toCompletable (
127-                                     () -> uploadBlob (context , digest , merkleTree , additionalInputs ),
128-                                     directExecutor ());
129-                               } else  {
130-                                 return  Completable .complete ();
131-                               }
132-                             })),
133-             /* onIgnored= */  missingDigestFinder ::count ,
134-             force );
135-     return  toTransferResult (upload );
136-   }
137- 
138112  private  ListenableFuture <Void > uploadBlob (
139113      RemoteActionExecutionContext  context ,
140114      Digest  digest ,
@@ -158,99 +132,159 @@ private ListenableFuture<Void> uploadBlob(
158132      return  cacheProtocol .uploadBlob (context , digest , message .toByteString ());
159133    }
160134
161-     return  Futures . immediateFailedFuture (
135+     return  immediateFailedFuture (
162136        new  IOException (
163137            format (
164138                "findMissingDigests returned a missing digest that has not been requested: %s" ,
165139                digest )));
166140  }
167141
168-   /** 
169-    * A missing digest finder that initiates the request when the internal counter reaches an 
170-    * expected count. 
171-    */ 
172-   class  MissingDigestFinder  {
173-     private  final  int  expectedCount ;
174- 
175-     private  final  AsyncSubject <ImmutableSet <Digest >> digestsSubject ;
176-     private  final  Single <ImmutableSet <Digest >> resultSingle ;
142+   static  class  UploadTask  {
143+     Digest  digest ;
144+     AtomicReference <Disposable > disposable ;
145+     SingleEmitter <Boolean > continuation ;
146+     Completable  completion ;
147+   }
177148
178-     @ GuardedBy ("this" )
179-     private  final  Set <Digest > digests ;
149+   private  Single <List <UploadTask >> createUploadTasks (
150+       RemoteActionExecutionContext  context ,
151+       MerkleTree  merkleTree ,
152+       Map <Digest , Message > additionalInputs ,
153+       Iterable <Digest > allDigests ,
154+       boolean  force ) {
155+     return  Single .using (
156+         () -> Profiler .instance ().profile ("collect digests" ),
157+         ignored  ->
158+             Flowable .fromIterable (allDigests )
159+                 .flatMapMaybe (
160+                     digest  ->
161+                         maybeCreateUploadTask (context , merkleTree , additionalInputs , digest , force ))
162+                 .collect (toImmutableList ()),
163+         SilentCloseable ::close );
164+   }
180165
181-     @ GuardedBy ("this" )
182-     private  int  currentCount  = 0 ;
166+   private  Maybe <UploadTask > maybeCreateUploadTask (
167+       RemoteActionExecutionContext  context ,
168+       MerkleTree  merkleTree ,
169+       Map <Digest , Message > additionalInputs ,
170+       Digest  digest ,
171+       boolean  force ) {
172+     return  Maybe .create (
173+         emitter  -> {
174+           AsyncSubject <Void > completion  = AsyncSubject .create ();
175+           UploadTask  uploadTask  = new  UploadTask ();
176+           uploadTask .digest  = digest ;
177+           uploadTask .disposable  = new  AtomicReference <>();
178+           uploadTask .completion  =
179+               Completable .fromObservable (
180+                   completion .doOnDispose (
181+                       () -> {
182+                         Disposable  d  = uploadTask .disposable .getAndSet (null );
183+                         if  (d  != null ) {
184+                           d .dispose ();
185+                         }
186+                       }));
187+           Completable  upload  =
188+               casUploadCache .execute (
189+                   digest ,
190+                   Single .<Boolean >create (
191+                           continuation  -> {
192+                             uploadTask .continuation  = continuation ;
193+                             emitter .onSuccess (uploadTask );
194+                           })
195+                       .flatMapCompletable (
196+                           shouldUpload  -> {
197+                             if  (!shouldUpload ) {
198+                               return  Completable .complete ();
199+                             }
183200
184-     MissingDigestFinder (RemoteActionExecutionContext  context , int  expectedCount ) {
185-       checkArgument (expectedCount  > 0 , "expectedCount should be greater than 0" );
186-       this .expectedCount  = expectedCount ;
187-       this .digestsSubject  = AsyncSubject .create ();
188-       this .digests  = new  HashSet <>();
201+                             return  toCompletable (
202+                                 () ->
203+                                     uploadBlob (
204+                                         context , uploadTask .digest , merkleTree , additionalInputs ),
205+                                 directExecutor ());
206+                           }),
207+                   /* onAlreadyRunning= */  () -> emitter .onSuccess (uploadTask ),
208+                   /* onAlreadyFinished= */  emitter ::onComplete ,
209+                   force );
210+           upload .subscribe (
211+               new  CompletableObserver () {
212+                 @ Override 
213+                 public  void  onSubscribe (@ NonNull  Disposable  d ) {
214+                   uploadTask .disposable .set (d );
215+                 }
189216
190-       AtomicBoolean  findMissingDigestsCalled  = new  AtomicBoolean (false );
191-       this .resultSingle  =
192-           Single .fromObservable (
193-               digestsSubject 
194-                   .flatMapSingle (
195-                       digests  -> {
196-                         boolean  wasCalled  = findMissingDigestsCalled .getAndSet (true );
197-                         // Make sure we don't have re-subscription caused by refCount() below. 
198-                         checkState (!wasCalled , "FindMissingDigests is called more than once" );
199-                         return  toSingle (
200-                             () -> findMissingDigests (context , digests ), directExecutor ());
201-                       })
202-                   // Use replay here because we could have a race condition that downstream hasn't 
203-                   // been added to the subscription list (to receive the upstream result) while 
204-                   // upstream is completed. 
205-                   .replay (1 )
206-                   .refCount ());
207-     }
217+                 @ Override 
218+                 public  void  onComplete () {
219+                   completion .onComplete ();
220+                 }
208221
209-     /** 
210-      * Register the {@code digest} and increase the counter. 
211-      * 
212-      * <p>Returned Single cannot be subscribed more than once. 
213-      * 
214-      * @return Single that emits the result of the {@code FindMissingDigest} request. 
215-      */ 
216-     Single <ImmutableSet <Digest >> registerAndCount (Digest  digest ) {
217-       AtomicBoolean  subscribed  = new  AtomicBoolean (false );
218-       // count() will potentially trigger the findMissingDigests call. Adding and counting before 
219-       // returning the Single could introduce a race that the result of findMissingDigests is 
220-       // available but the consumer doesn't get it because it hasn't subscribed the returned 
221-       // Single. In this case, it subscribes after upstream is completed resulting a re-run of 
222-       // findMissingDigests (due to refCount()). 
223-       // 
224-       // Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the 
225-       // returned Single to avoid a re-execution of findMissingDigests. 
226-       return  resultSingle .doOnSubscribe (
227-           d  -> {
228-             boolean  wasSubscribed  = subscribed .getAndSet (true );
229-             checkState (!wasSubscribed , "Single is subscribed more than once" );
230-             synchronized  (this ) {
231-               digests .add (digest );
232-             }
233-             count ();
234-           });
235-     }
222+                 @ Override 
223+                 public  void  onError (@ NonNull  Throwable  e ) {
224+                   Disposable  d  = uploadTask .disposable .get ();
225+                   if  (d  != null  && d .isDisposed ()) {
226+                     return ;
227+                   }
236228
237-     /** Increase the counter. */ 
238-     void  count () {
239-       ImmutableSet <Digest > digestsResult  = null ;
229+                   completion .onError (e );
230+                 }
231+               });
232+         });
233+   }
240234
241-       synchronized  (this ) {
242-         if  (currentCount  < expectedCount ) {
243-           currentCount ++;
244-           if  (currentCount  == expectedCount ) {
245-             digestsResult  = ImmutableSet .copyOf (digests );
246-           }
247-         }
248-       }
235+   private  Single <List <UploadTask >> findMissingBlobs (
236+       RemoteActionExecutionContext  context , List <UploadTask > uploadTasks ) {
237+     return  Single .using (
238+         () -> Profiler .instance ().profile ("findMissingDigests" ),
239+         ignored  ->
240+             Single .fromObservable (
241+                     Observable .fromSingle (
242+                             toSingle (
243+                                     () -> {
244+                                       ImmutableList <Digest > digestsToQuery  =
245+                                           uploadTasks .stream ()
246+                                               .filter (uploadTask  -> uploadTask .continuation  != null )
247+                                               .map (uploadTask  -> uploadTask .digest )
248+                                               .collect (toImmutableList ());
249+                                       if  (digestsToQuery .isEmpty ()) {
250+                                         return  immediateFuture (ImmutableSet .of ());
251+                                       }
252+                                       return  findMissingDigests (context , digestsToQuery );
253+                                     },
254+                                     directExecutor ())
255+                                 .map (
256+                                     missingDigests  -> {
257+                                       for  (UploadTask  uploadTask  : uploadTasks ) {
258+                                         if  (uploadTask .continuation  != null ) {
259+                                           uploadTask .continuation .onSuccess (
260+                                               missingDigests .contains (uploadTask .digest ));
261+                                         }
262+                                       }
263+                                       return  uploadTasks ;
264+                                     }))
265+                         // Use AsyncSubject so that if downstream is disposed, the 
266+                         // findMissingDigests call is not cancelled (because it may be needed by 
267+                         // other 
268+                         // threads). 
269+                         .subscribeWith (AsyncSubject .create ()))
270+                 .doOnDispose (
271+                     () -> {
272+                       for  (UploadTask  uploadTask  : uploadTasks ) {
273+                         Disposable  d  = uploadTask .disposable .getAndSet (null );
274+                         if  (d  != null ) {
275+                           d .dispose ();
276+                         }
277+                       }
278+                     }),
279+         SilentCloseable ::close );
280+   }
249281
250-       if  (digestsResult  != null ) {
251-         digestsSubject .onNext (digestsResult );
252-         digestsSubject .onComplete ();
253-       }
254-     }
282+   private  Flowable <TransferResult > waitForUploadTasks (List <UploadTask > uploadTasks ) {
283+     return  Flowable .using (
284+         () -> Profiler .instance ().profile ("upload" ),
285+         ignored  ->
286+             Flowable .fromIterable (uploadTasks )
287+                 .flatMapSingle (uploadTask  -> toTransferResult (uploadTask .completion )),
288+         SilentCloseable ::close );
255289  }
256290}
0 commit comments