|
74 | 74 |
|
75 | 75 | import javax.naming.NamingException; |
76 | 76 | import java.time.Duration; |
| 77 | +import java.util.Arrays; |
77 | 78 | import java.util.List; |
78 | 79 | import java.util.Map; |
79 | 80 | import java.util.Objects; |
|
83 | 84 | import java.util.concurrent.atomic.AtomicBoolean; |
84 | 85 | import java.util.concurrent.atomic.AtomicInteger; |
85 | 86 | import java.util.concurrent.atomic.AtomicReference; |
| 87 | +import java.util.function.Predicate; |
86 | 88 | import java.util.stream.Collectors; |
87 | 89 | import java.util.stream.Stream; |
88 | 90 |
|
@@ -962,33 +964,49 @@ private Mono<ProposedBucketConfigContext> fetchBucketConfigs(final String name, |
962 | 964 |
|
963 | 965 | return loadBucketConfigForSeed(identifier, mappedKvPort, mappedManagerPort, name, alternateAddress); |
964 | 966 | }) |
965 | | - .retryWhen( |
966 | | - Retry.from(companion -> companion.flatMap(rs -> { |
967 | | - final Throwable f = rs.failure(); |
968 | | - if (shutdown.get()) { |
969 | | - return Mono.error(new AlreadyShutdownException()); |
970 | | - } |
971 | | - if (f instanceof UnsupportedConfigMechanismException) { |
972 | | - return Mono.error(Exceptions.propagate(f)); |
973 | | - } |
974 | | - |
975 | | - boolean bucketNotFound = f instanceof BucketNotFoundDuringLoadException; |
976 | | - boolean bucketNotReady = f instanceof BucketNotReadyDuringLoadException; |
977 | | - boolean noAccess = f instanceof NoAccessDuringConfigLoadException; |
978 | | - |
979 | | - // For some, wait a bit longer; retry the rest quickly. |
980 | | - Duration delay = bucketNotFound || bucketNotReady || noAccess |
981 | | - ? Duration.ofMillis(500) |
982 | | - : Duration.ofMillis(1); |
983 | | - eventBus.publish(new BucketOpenRetriedEvent(name, delay, core.context(), f)); |
984 | | - return Mono |
985 | | - .just(rs.totalRetries()) |
986 | | - .delayElement(delay, core.context().environment().scheduler()); |
987 | | - }))) |
| 967 | + // Exponential backoff for certain errors. |
| 968 | + .retryWhen(Retry |
| 969 | + .backoff(Long.MAX_VALUE, Duration.ofMillis(500)) |
| 970 | + .maxBackoff(Duration.ofSeconds(10)) |
| 971 | + .filter(bucketConfigLoadRetryFilter(name, t -> isInstanceOfAnyOf(t, |
| 972 | + BucketNotFoundDuringLoadException.class, |
| 973 | + BucketNotReadyDuringLoadException.class, |
| 974 | + NoAccessDuringConfigLoadException.class |
| 975 | + ))) |
| 976 | + ) |
| 977 | + // Short fixed delay for the others we want to retry. |
| 978 | + .retryWhen(Retry |
| 979 | + .fixedDelay(Long.MAX_VALUE, Duration.ofMillis(10)) |
| 980 | + .filter(bucketConfigLoadRetryFilter(name, t -> !(t instanceof UnsupportedConfigMechanismException))) |
| 981 | + ) |
988 | 982 | ) |
989 | 983 | .next(); |
990 | 984 | } |
991 | 985 |
|
| 986 | + /** |
| 987 | + * Wraps a retry filter with common logic that checks for shutdown and publishes events. |
| 988 | + */ |
| 989 | + private Predicate<? super Throwable> bucketConfigLoadRetryFilter( |
| 990 | + String bucketName, |
| 991 | + Predicate<? super Throwable> errorFilter |
| 992 | + ) { |
| 993 | + return t -> { |
| 994 | + if (shutdown.get()) { |
| 995 | + throw new AlreadyShutdownException(); |
| 996 | + } |
| 997 | + |
| 998 | + boolean retry = errorFilter.test(t); |
| 999 | + if (retry) { |
| 1000 | + eventBus.publish(new BucketOpenRetriedEvent(bucketName, Duration.ZERO, core.context(), t)); |
| 1001 | + } |
| 1002 | + return retry; |
| 1003 | + }; |
| 1004 | + } |
| 1005 | + |
| 1006 | + private static boolean isInstanceOfAnyOf(Object o, Class<?>... candidates) { |
| 1007 | + return Arrays.stream(candidates).anyMatch(it -> it.isInstance(o)); |
| 1008 | + } |
| 1009 | + |
992 | 1010 | private Mono<ProposedGlobalConfigContext> fetchGlobalConfigs(final Set<SeedNode> seedNodes, final boolean tls, |
993 | 1011 | boolean allowStaleSeeds, boolean retryTimeouts) { |
994 | 1012 | final AtomicBoolean hasErrored = new AtomicBoolean(); |
|
0 commit comments