Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
752dde0
Added backpressure handling to prevent memory buildup and system degr…
ulischulte Oct 24, 2025
048fe4b
Fix checkstyle errors
ulischulte Oct 24, 2025
03519fc
Fix checkstyle and javaformat errors
ulischulte Oct 24, 2025
413580d
#2897: WIP - ensure the most recent interval tick is always processed…
ulischulte Oct 25, 2025
dfd7bfd
#2897: WIP - ensure the most recent interval tick is always processed…
ulischulte Oct 25, 2025
a0b9834
Merge branch 'master' into fix/2897-prevent-overflow-on-repeated-fail…
ulischulte Oct 25, 2025
b737a84
Merge remote-tracking branch 'origin/master' into fix/2897-prevent-ov…
ulischulte Oct 27, 2025
a7c3687
#2897: WIP - adjust test measurements
ulischulte Oct 27, 2025
5d9bbed
#2897: WIP - adjust test measurements
ulischulte Oct 27, 2025
bb311a7
#2897: WIP - adjust test measurements
ulischulte Oct 27, 2025
3737a69
fix checkstyle errors
ulischulte Oct 27, 2025
1a01b67
cleanup
ulischulte Oct 27, 2025
a1464c7
cleanup
ulischulte Oct 27, 2025
c9ca5b7
increase waiting time
ulischulte Oct 31, 2025
3e0056f
Merge branch 'master' into fix/2897-prevent-overflow-on-repeated-fail…
ulischulte Oct 31, 2025
aef7b08
Merge branch 'master' into fix/2897-prevent-overflow-on-repeated-fail…
ulischulte Nov 3, 2025
37422f1
Merge branch 'master' into fix/2897-prevent-overflow-on-repeated-fail…
ulischulte Nov 7, 2025
ee2287d
Update spring-boot-admin-server/src/test/java/de/codecentric/boot/adm…
ulischulte Nov 7, 2025
f382220
Update spring-boot-admin-server/src/test/java/de/codecentric/boot/adm…
ulischulte Nov 7, 2025
786de5f
Change test according to copilot review
ulischulte Nov 7, 2025
70028e0
Change test according to copilot review
ulischulte Nov 7, 2025
6002974
Update spring-boot-admin-server/src/test/java/de/codecentric/boot/adm…
ulischulte Nov 7, 2025
0580cee
Update spring-boot-admin-server/src/test/java/de/codecentric/boot/adm…
ulischulte Nov 7, 2025
4500c33
Update spring-boot-admin-server/src/test/java/de/codecentric/boot/adm…
ulischulte Nov 7, 2025
b08ef27
Implement copilot's review suggestions
ulischulte Nov 7, 2025
8bb946c
format
ulischulte Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;

import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -68,9 +71,14 @@ public class IntervalCheck {
@Nullable
private Scheduler scheduler;

@Setter
@NonNull
private Consumer<Throwable> retryConsumer;

public IntervalCheck(String name, Function<InstanceId, Mono<Void>> checkFn, Duration interval,
Duration minRetention, Duration maxBackoff) {
this.name = name;
this.retryConsumer = (Throwable throwable) -> log.warn("Unexpected error in {}-check", this.name, throwable);
this.checkFn = checkFn;
this.interval = interval;
this.minRetention = minRetention;
Expand All @@ -80,14 +88,22 @@ public IntervalCheck(String name, Function<InstanceId, Mono<Void>> checkFn, Dura
public void start() {
this.scheduler = Schedulers.newSingle(this.name + "-check");
this.subscription = Flux.interval(this.interval)
// ensure the most recent interval tick is always processed, preventing
// lost checks under overload.
.onBackpressureLatest()
.doOnSubscribe((s) -> log.debug("Scheduled {}-check every {}", this.name, this.interval))
.log(log.getName(), Level.FINEST)
.subscribeOn(this.scheduler)
.concatMap((i) -> this.checkAllInstances())
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
.maxBackoff(maxBackoff)
.doBeforeRetry((s) -> log.warn("Unexpected error in {}-check", this.name, s.failure())))
.subscribe(null, (error) -> log.error("Unexpected error in {}-check", name, error));
.log(log.getName(), Level.FINEST) //
.subscribeOn(this.scheduler) //
// Allow concurrent check cycles if previous is slow
.flatMap((i) -> this.checkAllInstances(), Math.max(1, Runtime.getRuntime().availableProcessors() / 2))
.retryWhen(createRetrySpec())
.subscribe(null, (Throwable error) -> log.error("Unexpected error in {}-check", this.name, error));
}

private Retry createRetrySpec() {
return Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
.maxBackoff(maxBackoff)
.doBeforeRetry((s) -> this.retryConsumer.accept(s.failure()));
}

public void markAsChecked(InstanceId instanceId) {
Expand All @@ -104,6 +120,10 @@ protected Mono<Void> checkAllInstances() {
.then();
}

protected Map<InstanceId, Instant> getLastChecked() {
return Collections.unmodifiableMap(lastChecked);
}

public void stop() {
if (this.subscription != null) {
this.subscription.dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@
package de.codecentric.boot.admin.server.services;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import reactor.core.publisher.Mono;

import de.codecentric.boot.admin.server.domain.values.InstanceId;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -106,9 +116,133 @@ void should_check_after_error() {
.untilAsserted(() -> verify(this.checkFn, atLeast(2)).apply(InstanceId.of("Test")));
}

@Test
void should_not_overflow_when_checks_timeout_randomly() {
Duration CHECK_INTERVAL = Duration.ofMillis(500);

@SuppressWarnings("unchecked")
Function<InstanceId, Mono<Void>> timeoutCheckFn = mock(Function.class);

java.util.concurrent.atomic.AtomicInteger invocationCount = new java.util.concurrent.atomic.AtomicInteger(0);
doAnswer((invocation) -> {
if (invocationCount.getAndIncrement() % 2 == 0) {
// Succeed quickly on even invocations
return Mono.empty();
}
else {
// Timeout on odd invocations
return Mono.just("slow response").delayElement(CHECK_INTERVAL.plus(Duration.ofSeconds(1))).then();
}
}).when(timeoutCheckFn).apply(any());

IntervalCheck timeoutCheck = new IntervalCheck("overflow-test", timeoutCheckFn, CHECK_INTERVAL, CHECK_INTERVAL,
Duration.ofSeconds(1));

List<Throwable> retryErrors = new CopyOnWriteArrayList<>();

timeoutCheck.setRetryConsumer(retryErrors::add);
timeoutCheck.markAsChecked(INSTANCE_ID);
timeoutCheck.start();
try {
await().atMost(Duration.ofSeconds(5))
.until(() -> retryErrors.stream()
.noneMatch((Throwable er) -> "OverflowException".equalsIgnoreCase(er.getClass().getSimpleName())));

assertThat(retryErrors).noneMatch((Throwable e) -> e.getCause() != null
&& "OverflowException".equalsIgnoreCase(e.getCause().getClass().getSimpleName()));
}
finally {
timeoutCheck.stop();
}
}

@Test
void should_not_lose_checks_under_backpressure() {
Duration CHECK_INTERVAL = Duration.ofMillis(100);

@SuppressWarnings("unchecked")
Function<InstanceId, Mono<Void>> slowCheckFn = mock(Function.class);
IntervalCheck slowCheck = new IntervalCheck("backpressure-test", slowCheckFn, CHECK_INTERVAL,
Duration.ofMillis(50), Duration.ofSeconds(1));

List<Long> checkTimes = new CopyOnWriteArrayList<>();
doAnswer((invocation) -> {
checkTimes.add(System.currentTimeMillis());
return Mono.empty();
}).when(slowCheckFn).apply(any());

slowCheck.markAsChecked(INSTANCE_ID);
slowCheck.start();

try {
await().atMost(Duration.ofSeconds(2)).until(() -> checkTimes.size() >= 5);
// With onBackpressureLatest, we should have processed multiple checks without
// drops
assertThat(checkTimes).hasSizeGreaterThanOrEqualTo(5);
}
finally {
slowCheck.stop();
}
}

@Test
void should_not_lose_checks_under_backpressure_latest() {
Duration CHECK_INTERVAL = Duration.ofMillis(100);

@SuppressWarnings("unchecked")
Function<InstanceId, Mono<Void>> slowCheckFn = mock(Function.class);

IntervalCheck slowCheck = new IntervalCheck("backpressure-test", slowCheckFn, CHECK_INTERVAL, CHECK_INTERVAL,
Duration.ofSeconds(1));

// Add multiple instances to increase load and cause drops
Set<InstanceId> instanceIds = IntStream.range(0, 50)
.mapToObj((i) -> InstanceId.of("Test" + i))
.collect(Collectors.toSet());

instanceIds.forEach((InstanceId instanceId) -> slowCheck.markAsChecked(instanceId));

List<Long> checkTimes = new CopyOnWriteArrayList<>();
Map<String, List<Long>> checkTimesPerInstance = new ConcurrentHashMap<>();

java.util.concurrent.atomic.AtomicInteger invocationCount = new java.util.concurrent.atomic.AtomicInteger(0);
doAnswer((invocation) -> {
long checkTime = System.currentTimeMillis();
String instanceId = instanceIdString(invocation);
List<Long> checkTimesInstance = checkTimesPerInstance.computeIfAbsent(instanceId,
(String k) -> new CopyOnWriteArrayList<>());
checkTimesInstance.add(checkTime);
checkTimes.add(checkTime);
if (invocationCount.getAndIncrement() % 2 == 0) {
// Sometimes succeed quickly
return Mono.empty();
}
else {
// Sometimes slow
return Mono.delay(CHECK_INTERVAL.plus(Duration.ofMillis(500))).then();
}
}).when(slowCheckFn).apply(any());

slowCheck.start();

try {
await().atMost(Duration.ofSeconds(5)).until(() -> checkTimes.size() >= 500);
// With onBackpressureLatest, we should process more checks without drops
instanceIds.forEach((InstanceId instanceId) -> assertThat(checkTimesPerInstance.get(instanceId.getValue()))
.hasSizeGreaterThanOrEqualTo(10));
}
finally {
slowCheck.stop();
}
}

@AfterEach
void tearDown() {
this.intervalCheck.stop();
}

private static String instanceIdString(InvocationOnMock invocation) {
return invocation.getArguments()[0].toString();
}

}