Skip to content

Commit 413580d

Browse files
committed
#2897: WIP - ensure the most recent interval tick is always processed, preventing lost checks under overload.
1 parent 03519fc commit 413580d

File tree

2 files changed

+37
-1
lines changed

2 files changed

+37
-1
lines changed

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class IntervalCheck {
4949

5050
private final String name;
5151

52+
@Getter
5253
private final Map<InstanceId, Instant> lastChecked = new ConcurrentHashMap<>();
5354

5455
private final Function<InstanceId, Mono<Void>> checkFn;
@@ -85,7 +86,8 @@ public IntervalCheck(String name, Function<InstanceId, Mono<Void>> checkFn, Dura
8586
public void start() {
8687
this.scheduler = Schedulers.newSingle(this.name + "-check");
8788
this.subscription = Flux.interval(this.interval)
88-
.onBackpressureDrop((tick) -> log.debug("Dropped check tick due to overload"))
89+
// ensure the most recent interval tick is always processed, preventing lost checks under overload.
90+
.onBackpressureLatest()
8991
.doOnSubscribe((s) -> log.debug("Scheduled {}-check every {}", this.name, this.interval))
9092
.log(log.getName(), Level.FINEST)
9193
.subscribeOn(this.scheduler)

spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/IntervalCheckTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ void should_not_wait_longer_than_maxBackoff() {
9999
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> verify(this.checkFn, atLeast(7)).apply(INSTANCE_ID));
100100
}
101101

102+
103+
102104
@Test
103105
void should_check_after_error() {
104106
this.intervalCheck.markAsChecked(INSTANCE_ID);
@@ -149,6 +151,38 @@ void should_not_overflow_when_checks_timeout_randomly() {
149151
}
150152
}
151153

154+
@Test
155+
void should_not_lose_checks_under_backpressure() {
156+
Duration CHECK_INTERVAL = Duration.ofMillis(100);
157+
158+
@SuppressWarnings("unchecked")
159+
Function<InstanceId, Mono<Void>> slowCheckFn = mock(Function.class);
160+
doAnswer((invocation) -> Mono.delay(CHECK_INTERVAL.plus(Duration.ofMillis(50))).then()).when(slowCheckFn)
161+
.apply(any());
162+
163+
IntervalCheck slowCheck = new IntervalCheck("backpressure-test", slowCheckFn, CHECK_INTERVAL,
164+
Duration.ofMillis(50), Duration.ofSeconds(1));
165+
166+
List<Long> checkTimes = new CopyOnWriteArrayList<>();
167+
doAnswer((invocation) -> {
168+
checkTimes.add(System.currentTimeMillis());
169+
return Mono.empty();
170+
}).when(slowCheckFn).apply(any());
171+
172+
slowCheck.markAsChecked(INSTANCE_ID);
173+
slowCheck.start();
174+
175+
try {
176+
await().atMost(Duration.ofSeconds(2)).until(() -> checkTimes.size() >= 5);
177+
// With onBackpressureLatest, we should have processed multiple checks without
178+
// drops
179+
assertThat(checkTimes).hasSizeGreaterThanOrEqualTo(5);
180+
}
181+
finally {
182+
slowCheck.stop();
183+
}
184+
}
185+
152186
@AfterEach
153187
void tearDown() {
154188
this.intervalCheck.stop();

0 commit comments

Comments
 (0)