diff --git a/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java b/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java index 9f982e13fcf..ff82aa27667 100644 --- a/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java +++ b/spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java @@ -20,12 +20,15 @@ import java.time.Instant; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Level; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; import org.springframework.lang.Nullable; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -68,9 +71,14 @@ public class IntervalCheck { @Nullable private Scheduler scheduler; + @Setter + @NonNull + private Consumer retryConsumer; + public IntervalCheck(String name, Function> checkFn, Duration interval, Duration minRetention, Duration maxBackoff) { this.name = name; + this.retryConsumer = (Throwable throwable) -> log.warn("Unexpected error in {}-check", this.name, throwable); this.checkFn = checkFn; this.interval = interval; this.minRetention = minRetention; @@ -80,21 +88,29 @@ public IntervalCheck(String name, Function> checkFn, Dura public void start() { this.scheduler = Schedulers.newSingle(this.name + "-check"); this.subscription = Flux.interval(this.interval) + // ensure the most recent interval tick is always processed, preventing + // lost checks under overload. + .onBackpressureLatest() .doOnSubscribe((s) -> log.debug("Scheduled {}-check every {}", this.name, this.interval)) - .log(log.getName(), Level.FINEST) - .subscribeOn(this.scheduler) - .concatMap((i) -> this.checkAllInstances()) - .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .maxBackoff(maxBackoff) - .doBeforeRetry((s) -> log.warn("Unexpected error in {}-check", this.name, s.failure()))) - .subscribe(null, (error) -> log.error("Unexpected error in {}-check", name, error)); + .log(log.getName(), Level.FINEST) // + .subscribeOn(this.scheduler) // + // Allow concurrent check cycles if previous is slow + .flatMap((i) -> this.checkAllInstances(), Math.max(1, Runtime.getRuntime().availableProcessors() / 2)) + .retryWhen(createRetrySpec()) + .subscribe(null, (Throwable error) -> log.error("Unexpected error in {}-check", this.name, error)); + } + + private Retry createRetrySpec() { + return Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .maxBackoff(maxBackoff) + .doBeforeRetry((s) -> this.retryConsumer.accept(s.failure())); } public void markAsChecked(InstanceId instanceId) { this.lastChecked.put(instanceId, Instant.now()); } - protected Mono checkAllInstances() { + protected Publisher checkAllInstances() { log.debug("check {} for all instances", this.name); Instant expiration = Instant.now().minus(this.minRetention); return Flux.fromIterable(this.lastChecked.entrySet()) diff --git a/spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/IntervalCheckTest.java b/spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/IntervalCheckTest.java index 422339102e2..9792e03c4f0 100644 --- a/spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/IntervalCheckTest.java +++ b/spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/IntervalCheckTest.java @@ -17,18 +17,28 @@ package de.codecentric.boot.admin.server.services; import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; import reactor.core.publisher.Mono; import de.codecentric.boot.admin.server.domain.values.InstanceId; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -106,9 +116,133 @@ void should_check_after_error() { .untilAsserted(() -> verify(this.checkFn, atLeast(2)).apply(InstanceId.of("Test"))); } + @Test + void should_not_overflow_when_checks_timeout_randomly() { + Duration CHECK_INTERVAL = Duration.ofMillis(500); + + @SuppressWarnings("unchecked") + Function> timeoutCheckFn = mock(Function.class); + + java.util.concurrent.atomic.AtomicInteger invocationCount = new java.util.concurrent.atomic.AtomicInteger(0); + doAnswer((invocation) -> { + if (invocationCount.getAndIncrement() % 2 == 0) { + // Succeed quickly on even invocations + return Mono.empty(); + } + else { + // Timeout on odd invocations + return Mono.just("slow response").delayElement(CHECK_INTERVAL.plus(Duration.ofSeconds(1))).then(); + } + }).when(timeoutCheckFn).apply(any()); + + IntervalCheck timeoutCheck = new IntervalCheck("overflow-test", timeoutCheckFn, CHECK_INTERVAL, CHECK_INTERVAL, + Duration.ofSeconds(1)); + + List retryErrors = new CopyOnWriteArrayList<>(); + + timeoutCheck.setRetryConsumer(retryErrors::add); + timeoutCheck.markAsChecked(INSTANCE_ID); + timeoutCheck.start(); + try { + await().atMost(Duration.ofSeconds(5)) + .until(() -> retryErrors.stream() + .noneMatch((Throwable er) -> "OverflowException".equalsIgnoreCase(er.getClass().getSimpleName()))); + + assertThat(retryErrors).noneMatch((Throwable e) -> e.getCause() != null + && "OverflowException".equalsIgnoreCase(e.getCause().getClass().getSimpleName())); + } + finally { + timeoutCheck.stop(); + } + } + + @Test + void should_not_lose_checks_under_backpressure() { + Duration CHECK_INTERVAL = Duration.ofMillis(100); + + @SuppressWarnings("unchecked") + Function> slowCheckFn = mock(Function.class); + IntervalCheck slowCheck = new IntervalCheck("backpressure-test", slowCheckFn, CHECK_INTERVAL, + Duration.ofMillis(50), Duration.ofSeconds(1)); + + List checkTimes = new CopyOnWriteArrayList<>(); + doAnswer((invocation) -> { + checkTimes.add(System.currentTimeMillis()); + return Mono.empty(); + }).when(slowCheckFn).apply(any()); + + slowCheck.markAsChecked(INSTANCE_ID); + slowCheck.start(); + + try { + await().atMost(Duration.ofSeconds(2)).until(() -> checkTimes.size() >= 5); + // With onBackpressureLatest, we should have processed multiple checks without + // drops + assertThat(checkTimes).hasSizeGreaterThanOrEqualTo(5); + } + finally { + slowCheck.stop(); + } + } + + @Test + void should_not_lose_checks_under_backpressure_latest() { + Duration CHECK_INTERVAL = Duration.ofMillis(100); + + @SuppressWarnings("unchecked") + Function> slowCheckFn = mock(Function.class); + + IntervalCheck slowCheck = new IntervalCheck("backpressure-test", slowCheckFn, CHECK_INTERVAL, CHECK_INTERVAL, + Duration.ofSeconds(1)); + + // Add multiple instances to increase load and cause drops + Set instanceIds = IntStream.range(0, 50) + .mapToObj((i) -> InstanceId.of("Test" + i)) + .collect(Collectors.toSet()); + + instanceIds.forEach((InstanceId instanceId) -> slowCheck.markAsChecked(instanceId)); + + List checkTimes = new CopyOnWriteArrayList<>(); + Map> checkTimesPerInstance = new ConcurrentHashMap<>(); + + java.util.concurrent.atomic.AtomicInteger invocationCount = new java.util.concurrent.atomic.AtomicInteger(0); + doAnswer((invocation) -> { + long checkTime = System.currentTimeMillis(); + String instanceId = instanceIdString(invocation); + List checkTimesInstance = checkTimesPerInstance.computeIfAbsent(instanceId, + (String k) -> new CopyOnWriteArrayList<>()); + checkTimesInstance.add(checkTime); + checkTimes.add(checkTime); + if (invocationCount.getAndIncrement() % 2 == 0) { + // Sometimes succeed quickly + return Mono.empty(); + } + else { + // Sometimes slow + return Mono.delay(CHECK_INTERVAL.plus(Duration.ofMillis(500))).then(); + } + }).when(slowCheckFn).apply(any()); + + slowCheck.start(); + + try { + await().atMost(Duration.ofSeconds(5)).until(() -> checkTimes.size() >= 500); + // With onBackpressureLatest, we should process more checks without drops + instanceIds.forEach((InstanceId instanceId) -> assertThat(checkTimesPerInstance.get(instanceId.getValue())) + .hasSizeGreaterThanOrEqualTo(10)); + } + finally { + slowCheck.stop(); + } + } + @AfterEach void tearDown() { this.intervalCheck.stop(); } + private static String instanceIdString(InvocationOnMock invocation) { + return invocation.getArguments()[0].toString(); + } + }