Skip to content

Commit 68d6cef

Browse files
committed
#2897: WIP - ensure the most recent interval tick is always processed, preventing lost checks under overload.
1 parent 413580d commit 68d6cef

File tree

2 files changed

+76
-19
lines changed

2 files changed

+76
-19
lines changed

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,13 @@ public class IntervalCheck {
7373
@Setter
7474
private Consumer<Throwable> errorConsumer;
7575

76+
@Setter
77+
private Consumer<Throwable> retryConsumer;
78+
7679
public IntervalCheck(String name, Function<InstanceId, Mono<Void>> checkFn, Duration interval,
7780
Duration minRetention, Duration maxBackoff) {
7881
this.name = name;
79-
this.errorConsumer = (Throwable throwable) -> log.error("Unexpected error in {}-check", this.name, throwable);
82+
this.retryConsumer = (Throwable throwable) -> log.warn("Unexpected error in {}-check", this.name, throwable);
8083
this.checkFn = checkFn;
8184
this.interval = interval;
8285
this.minRetention = minRetention;
@@ -86,22 +89,22 @@ public IntervalCheck(String name, Function<InstanceId, Mono<Void>> checkFn, Dura
8689
public void start() {
8790
this.scheduler = Schedulers.newSingle(this.name + "-check");
8891
this.subscription = Flux.interval(this.interval)
89-
// ensure the most recent interval tick is always processed, preventing lost checks under overload.
92+
// ensure the most recent interval tick is always processed, preventing
93+
// lost checks under overload.
9094
.onBackpressureLatest()
9195
.doOnSubscribe((s) -> log.debug("Scheduled {}-check every {}", this.name, this.interval))
92-
.log(log.getName(), Level.FINEST)
93-
.subscribeOn(this.scheduler)
96+
.log(log.getName(), Level.FINEST) //
97+
.subscribeOn(this.scheduler) //
9498
.flatMap((i) -> this.checkAllInstances()) // Allow concurrent check cycles
9599
// if previous is slow
96100
.retryWhen(createRetrySpec())
97-
.subscribe(null, this.errorConsumer);
101+
.subscribe(null, (Throwable error) -> log.error("Unexpected error in {}-check", this.name, error));
98102
}
99103

100104
private Retry createRetrySpec() {
101-
return Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(maxBackoff).doBeforeRetry((s) -> {
102-
log.warn("Unexpected error in {}-check", this.name, s.failure());
103-
this.errorConsumer.accept(s.failure());
104-
});
105+
return Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
106+
.maxBackoff(maxBackoff)
107+
.doBeforeRetry((s) -> this.retryConsumer.accept(s.failure()));
105108
}
106109

107110
public void markAsChecked(InstanceId instanceId) {

spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/IntervalCheckTest.java

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@
1818

1919
import java.time.Duration;
2020
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Set;
23+
import java.util.concurrent.ConcurrentHashMap;
2124
import java.util.concurrent.CopyOnWriteArrayList;
2225
import java.util.function.Function;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.IntStream;
2328

2429
import org.junit.jupiter.api.AfterEach;
2530
import org.junit.jupiter.api.Test;
31+
import org.mockito.invocation.InvocationOnMock;
2632
import reactor.core.publisher.Mono;
2733

2834
import de.codecentric.boot.admin.server.domain.values.InstanceId;
@@ -99,8 +105,6 @@ void should_not_wait_longer_than_maxBackoff() {
99105
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> verify(this.checkFn, atLeast(7)).apply(INSTANCE_ID));
100106
}
101107

102-
103-
104108
@Test
105109
void should_check_after_error() {
106110
this.intervalCheck.markAsChecked(INSTANCE_ID);
@@ -126,25 +130,25 @@ void should_not_overflow_when_checks_timeout_randomly() {
126130
}
127131
else {
128132
// Sometimes timeout
129-
return Mono.just("slow response").delayElement(CHECK_INTERVAL.plus(Duration.ofMillis(100))).then();
133+
return Mono.just("slow response").delayElement(CHECK_INTERVAL.plus(Duration.ofSeconds(1))).then();
130134
}
131135
}).when(timeoutCheckFn).apply(any());
132136

133137
IntervalCheck timeoutCheck = new IntervalCheck("overflow-test", timeoutCheckFn, CHECK_INTERVAL, CHECK_INTERVAL,
134-
Duration.ofMillis(1000));
138+
Duration.ofSeconds(1));
135139

136-
List<Throwable> errors = new CopyOnWriteArrayList<>();
140+
List<Throwable> retryErrors = new CopyOnWriteArrayList<>();
137141

138-
timeoutCheck.setErrorConsumer(errors::add);
142+
timeoutCheck.setRetryConsumer(retryErrors::add);
139143
timeoutCheck.markAsChecked(INSTANCE_ID);
140144
timeoutCheck.start();
141145
try {
142146
await().pollDelay(Duration.ofSeconds(5))
143-
.until(() -> errors.stream()
147+
.until(() -> retryErrors.stream()
144148
.noneMatch((Throwable er) -> "OverflowException".equalsIgnoreCase(er.getClass().getSimpleName())));
145149

146-
assertThat(errors)
147-
.noneMatch((Throwable e) -> "OverflowException".equalsIgnoreCase(e.getClass().getSimpleName()));
150+
assertThat(retryErrors).noneMatch(
151+
(Throwable e) -> "OverflowException".equalsIgnoreCase(e.getCause().getClass().getSimpleName()));
148152
}
149153
finally {
150154
timeoutCheck.stop();
@@ -161,7 +165,7 @@ void should_not_lose_checks_under_backpressure() {
161165
.apply(any());
162166

163167
IntervalCheck slowCheck = new IntervalCheck("backpressure-test", slowCheckFn, CHECK_INTERVAL,
164-
Duration.ofMillis(50), Duration.ofSeconds(1));
168+
Duration.ofMillis(50), Duration.ofSeconds(1));
165169

166170
List<Long> checkTimes = new CopyOnWriteArrayList<>();
167171
doAnswer((invocation) -> {
@@ -183,9 +187,59 @@ void should_not_lose_checks_under_backpressure() {
183187
}
184188
}
185189

190+
@Test
191+
void should_not_lose_checks_under_backpressure_latest() {
192+
Duration CHECK_INTERVAL = Duration.ofMillis(100);
193+
194+
@SuppressWarnings("unchecked")
195+
Function<InstanceId, Mono<Void>> slowCheckFn = mock(Function.class);
196+
doAnswer((invocation) -> Mono.delay(CHECK_INTERVAL.plus(Duration.ofMillis(50))).then()).when(slowCheckFn)
197+
.apply(any());
198+
199+
IntervalCheck slowCheck = new IntervalCheck("backpressure-test", slowCheckFn, CHECK_INTERVAL, CHECK_INTERVAL,
200+
Duration.ofSeconds(1));
201+
202+
// Add multiple instances to increase load and cause drops
203+
Set<InstanceId> instanceIds = IntStream.range(0, 50)
204+
.mapToObj((i) -> InstanceId.of("Test" + i))
205+
.collect(Collectors.toSet());
206+
207+
instanceIds.forEach(slowCheck::markAsChecked);
208+
209+
List<Long> checkTimes = new CopyOnWriteArrayList<>();
210+
Map<String, List<Long>> checkTimesPerInstance = new ConcurrentHashMap<>();
211+
212+
doAnswer((invocation) -> {
213+
long checkTime = System.currentTimeMillis();
214+
String instanceId = instanceIdString(invocation);
215+
List<Long> checkTimesList = checkTimesPerInstance.computeIfAbsent(instanceId,
216+
(String k) -> new CopyOnWriteArrayList<>());
217+
checkTimesList.add(checkTime);
218+
checkTimes.add(checkTime);
219+
return Mono.empty();
220+
}).when(slowCheckFn).apply(any());
221+
222+
slowCheck.start();
223+
224+
try {
225+
await().atMost(Duration.ofSeconds(2)).until(() -> checkTimes.size() > 50);
226+
// With onBackpressureLatest, we should process more checks without drops
227+
assertThat(checkTimes).hasSizeGreaterThanOrEqualTo(100);
228+
instanceIds.forEach((InstanceId instanceId) -> assertThat(checkTimesPerInstance.get(instanceId.getValue()))
229+
.hasSizeGreaterThanOrEqualTo(2)); // Expect more checks
230+
}
231+
finally {
232+
slowCheck.stop();
233+
}
234+
}
235+
186236
@AfterEach
187237
void tearDown() {
188238
this.intervalCheck.stop();
189239
}
190240

241+
private static String instanceIdString(InvocationOnMock invocation) {
242+
return invocation.getArguments()[0].toString();
243+
}
244+
191245
}

0 commit comments

Comments
 (0)