|
13 | 13 | // limitations under the License. |
14 | 14 | package com.google.devtools.build.lib.remote; |
15 | 15 |
|
16 | | -import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; |
17 | | -import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer; |
| 16 | +import static com.google.common.base.Preconditions.checkArgument; |
| 17 | +import static com.google.common.base.Preconditions.checkState; |
| 18 | +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; |
| 19 | +import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable; |
| 20 | +import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle; |
| 21 | +import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer; |
| 22 | +import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult; |
18 | 23 | import static java.lang.String.format; |
19 | 24 |
|
20 | 25 | import build.bazel.remote.execution.v2.Digest; |
21 | 26 | import build.bazel.remote.execution.v2.Directory; |
| 27 | +import com.google.common.base.Throwables; |
22 | 28 | import com.google.common.collect.ImmutableSet; |
23 | 29 | import com.google.common.util.concurrent.Futures; |
24 | 30 | import com.google.common.util.concurrent.ListenableFuture; |
25 | | -import com.google.common.util.concurrent.MoreExecutors; |
26 | 31 | import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; |
27 | 32 | import com.google.devtools.build.lib.remote.common.RemoteCacheClient; |
28 | 33 | import com.google.devtools.build.lib.remote.merkletree.MerkleTree; |
29 | 34 | import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes; |
30 | 35 | import com.google.devtools.build.lib.remote.options.RemoteOptions; |
31 | 36 | import com.google.devtools.build.lib.remote.util.DigestUtil; |
32 | | -import com.google.devtools.build.lib.remote.util.RxFutures; |
| 37 | +import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult; |
33 | 38 | import com.google.protobuf.Message; |
34 | 39 | import io.reactivex.rxjava3.core.Completable; |
| 40 | +import io.reactivex.rxjava3.core.Flowable; |
| 41 | +import io.reactivex.rxjava3.core.Single; |
35 | 42 | import io.reactivex.rxjava3.subjects.AsyncSubject; |
36 | 43 | import java.io.IOException; |
37 | | -import java.util.ArrayList; |
38 | | -import java.util.List; |
| 44 | +import java.util.HashSet; |
39 | 45 | import java.util.Map; |
40 | | -import java.util.concurrent.ConcurrentHashMap; |
| 46 | +import java.util.Set; |
| 47 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 48 | +import javax.annotation.concurrent.GuardedBy; |
41 | 49 |
|
42 | 50 | /** A {@link RemoteCache} with additional functionality needed for remote execution. */ |
43 | 51 | public class RemoteExecutionCache extends RemoteCache { |
@@ -73,62 +81,58 @@ public void ensureInputsPresent( |
73 | 81 | .addAll(additionalInputs.keySet()) |
74 | 82 | .build(); |
75 | 83 |
|
76 | | - // Collect digests that are not being or already uploaded |
77 | | - ConcurrentHashMap<Digest, AsyncSubject<Boolean>> missingDigestSubjects = |
78 | | - new ConcurrentHashMap<>(); |
79 | | - |
80 | | - List<ListenableFuture<Void>> uploadFutures = new ArrayList<>(); |
81 | | - for (Digest digest : allDigests) { |
82 | | - Completable upload = |
83 | | - casUploadCache.execute( |
84 | | - digest, |
85 | | - Completable.defer( |
86 | | - () -> { |
87 | | - // The digest hasn't been processed, add it to the collection which will be used |
88 | | - // later for findMissingDigests call |
89 | | - AsyncSubject<Boolean> missingDigestSubject = AsyncSubject.create(); |
90 | | - missingDigestSubjects.put(digest, missingDigestSubject); |
91 | | - |
92 | | - return missingDigestSubject.flatMapCompletable( |
93 | | - missing -> { |
94 | | - if (!missing) { |
95 | | - return Completable.complete(); |
96 | | - } |
97 | | - return RxFutures.toCompletable( |
98 | | - () -> uploadBlob(context, digest, merkleTree, additionalInputs), |
99 | | - MoreExecutors.directExecutor()); |
100 | | - }); |
101 | | - }), |
102 | | - force); |
103 | | - uploadFutures.add(RxFutures.toListenableFuture(upload)); |
| 84 | + if (allDigests.isEmpty()) { |
| 85 | + return; |
104 | 86 | } |
105 | 87 |
|
106 | | - ImmutableSet<Digest> missingDigests; |
107 | | - try { |
108 | | - missingDigests = getFromFuture(findMissingDigests(context, missingDigestSubjects.keySet())); |
109 | | - } catch (IOException | InterruptedException e) { |
110 | | - for (Map.Entry<Digest, AsyncSubject<Boolean>> entry : missingDigestSubjects.entrySet()) { |
111 | | - entry.getValue().onError(e); |
112 | | - } |
| 88 | + MissingDigestFinder missingDigestFinder = new MissingDigestFinder(context, allDigests.size()); |
| 89 | + Flowable<TransferResult> uploads = |
| 90 | + Flowable.fromIterable(allDigests) |
| 91 | + .flatMapSingle( |
| 92 | + digest -> |
| 93 | + uploadBlobIfMissing( |
| 94 | + context, merkleTree, additionalInputs, force, missingDigestFinder, digest)); |
113 | 95 |
|
114 | | - if (e instanceof InterruptedException) { |
115 | | - Thread.currentThread().interrupt(); |
| 96 | + try { |
| 97 | + mergeBulkTransfer(uploads).blockingAwait(); |
| 98 | + } catch (RuntimeException e) { |
| 99 | + Throwable cause = e.getCause(); |
| 100 | + if (cause != null) { |
| 101 | + Throwables.throwIfInstanceOf(cause, InterruptedException.class); |
| 102 | + Throwables.throwIfInstanceOf(cause, IOException.class); |
116 | 103 | } |
117 | 104 | throw e; |
118 | 105 | } |
| 106 | + } |
119 | 107 |
|
120 | | - for (Map.Entry<Digest, AsyncSubject<Boolean>> entry : missingDigestSubjects.entrySet()) { |
121 | | - AsyncSubject<Boolean> missingSubject = entry.getValue(); |
122 | | - if (missingDigests.contains(entry.getKey())) { |
123 | | - missingSubject.onNext(true); |
124 | | - } else { |
125 | | - // The digest is already existed in the remote cache, skip the upload. |
126 | | - missingSubject.onNext(false); |
127 | | - } |
128 | | - missingSubject.onComplete(); |
129 | | - } |
130 | | - |
131 | | - waitForBulkTransfer(uploadFutures, /* cancelRemainingOnInterrupt=*/ false); |
| 108 | + private Single<TransferResult> uploadBlobIfMissing( |
| 109 | + RemoteActionExecutionContext context, |
| 110 | + MerkleTree merkleTree, |
| 111 | + Map<Digest, Message> additionalInputs, |
| 112 | + boolean force, |
| 113 | + MissingDigestFinder missingDigestFinder, |
| 114 | + Digest digest) { |
| 115 | + Completable upload = |
| 116 | + casUploadCache.execute( |
| 117 | + digest, |
| 118 | + Completable.defer( |
| 119 | + () -> |
| 120 | + // Only reach here if the digest is missing and is not being uploaded. |
| 121 | + missingDigestFinder |
| 122 | + .registerAndCount(digest) |
| 123 | + .flatMapCompletable( |
| 124 | + missingDigests -> { |
| 125 | + if (missingDigests.contains(digest)) { |
| 126 | + return toCompletable( |
| 127 | + () -> uploadBlob(context, digest, merkleTree, additionalInputs), |
| 128 | + directExecutor()); |
| 129 | + } else { |
| 130 | + return Completable.complete(); |
| 131 | + } |
| 132 | + })), |
| 133 | + /* onIgnored= */ missingDigestFinder::count, |
| 134 | + force); |
| 135 | + return toTransferResult(upload); |
132 | 136 | } |
133 | 137 |
|
134 | 138 | private ListenableFuture<Void> uploadBlob( |
@@ -160,4 +164,93 @@ private ListenableFuture<Void> uploadBlob( |
160 | 164 | "findMissingDigests returned a missing digest that has not been requested: %s", |
161 | 165 | digest))); |
162 | 166 | } |
| 167 | + |
| 168 | + /** |
| 169 | + * A missing digest finder that initiates the request when the internal counter reaches an |
| 170 | + * expected count. |
| 171 | + */ |
| 172 | + class MissingDigestFinder { |
| 173 | + private final int expectedCount; |
| 174 | + |
| 175 | + private final AsyncSubject<ImmutableSet<Digest>> digestsSubject; |
| 176 | + private final Single<ImmutableSet<Digest>> resultSingle; |
| 177 | + |
| 178 | + @GuardedBy("this") |
| 179 | + private final Set<Digest> digests; |
| 180 | + |
| 181 | + @GuardedBy("this") |
| 182 | + private int currentCount = 0; |
| 183 | + |
| 184 | + MissingDigestFinder(RemoteActionExecutionContext context, int expectedCount) { |
| 185 | + checkArgument(expectedCount > 0, "expectedCount should be greater than 0"); |
| 186 | + this.expectedCount = expectedCount; |
| 187 | + this.digestsSubject = AsyncSubject.create(); |
| 188 | + this.digests = new HashSet<>(); |
| 189 | + |
| 190 | + AtomicBoolean findMissingDigestsCalled = new AtomicBoolean(false); |
| 191 | + this.resultSingle = |
| 192 | + Single.fromObservable( |
| 193 | + digestsSubject |
| 194 | + .flatMapSingle( |
| 195 | + digests -> { |
| 196 | + boolean wasCalled = findMissingDigestsCalled.getAndSet(true); |
| 197 | + // Make sure we don't have re-subscription caused by refCount() below. |
| 198 | + checkState(!wasCalled, "FindMissingDigests is called more than once"); |
| 199 | + return toSingle( |
| 200 | + () -> findMissingDigests(context, digests), directExecutor()); |
| 201 | + }) |
| 202 | + // Use replay here because we could have a race condition that downstream hasn't |
| 203 | + // been added to the subscription list (to receive the upstream result) while |
| 204 | + // upstream is completed. |
| 205 | + .replay(1) |
| 206 | + .refCount()); |
| 207 | + } |
| 208 | + |
| 209 | + /** |
| 210 | + * Register the {@code digest} and increase the counter. |
| 211 | + * |
| 212 | + * <p>Returned Single cannot be subscribed more than once. |
| 213 | + * |
| 214 | + * @return Single that emits the result of the {@code FindMissingDigest} request. |
| 215 | + */ |
| 216 | + Single<ImmutableSet<Digest>> registerAndCount(Digest digest) { |
| 217 | + AtomicBoolean subscribed = new AtomicBoolean(false); |
| 218 | + // count() will potentially trigger the findMissingDigests call. Adding and counting before |
| 219 | + // returning the Single could introduce a race that the result of findMissingDigests is |
| 220 | + // available but the consumer doesn't get it because it hasn't subscribed the returned |
| 221 | + // Single. In this case, it subscribes after upstream is completed resulting a re-run of |
| 222 | + // findMissingDigests (due to refCount()). |
| 223 | + // |
| 224 | + // Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the |
| 225 | + // returned Single to avoid a re-execution of findMissingDigests. |
| 226 | + return resultSingle.doOnSubscribe( |
| 227 | + d -> { |
| 228 | + boolean wasSubscribed = subscribed.getAndSet(true); |
| 229 | + checkState(!wasSubscribed, "Single is subscribed more than once"); |
| 230 | + synchronized (this) { |
| 231 | + digests.add(digest); |
| 232 | + } |
| 233 | + count(); |
| 234 | + }); |
| 235 | + } |
| 236 | + |
| 237 | + /** Increase the counter. */ |
| 238 | + void count() { |
| 239 | + ImmutableSet<Digest> digestsResult = null; |
| 240 | + |
| 241 | + synchronized (this) { |
| 242 | + if (currentCount < expectedCount) { |
| 243 | + currentCount++; |
| 244 | + if (currentCount == expectedCount) { |
| 245 | + digestsResult = ImmutableSet.copyOf(digests); |
| 246 | + } |
| 247 | + } |
| 248 | + } |
| 249 | + |
| 250 | + if (digestsResult != null) { |
| 251 | + digestsSubject.onNext(digestsResult); |
| 252 | + digestsSubject.onComplete(); |
| 253 | + } |
| 254 | + } |
| 255 | + } |
163 | 256 | } |
0 commit comments