Skip to content

Commit 5734b2f

Browse files
ulischulteCopilot
andauthored
Added backpressure handling to prevent memory buildup and system degredation under high load (#4784)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent a42c707 commit 5734b2f

File tree

2 files changed

+158
-8
lines changed

2 files changed

+158
-8
lines changed

spring-boot-admin-server/src/main/java/de/codecentric/boot/admin/server/services/IntervalCheck.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import java.time.Instant;
2121
import java.util.Map;
2222
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.function.Consumer;
2324
import java.util.function.Function;
2425
import java.util.logging.Level;
2526

2627
import lombok.Getter;
28+
import lombok.NonNull;
2729
import lombok.Setter;
2830
import lombok.extern.slf4j.Slf4j;
31+
import org.reactivestreams.Publisher;
2932
import org.springframework.lang.Nullable;
3033
import reactor.core.Disposable;
3134
import reactor.core.publisher.Flux;
@@ -68,9 +71,14 @@ public class IntervalCheck {
6871
@Nullable
6972
private Scheduler scheduler;
7073

74+
@Setter
75+
@NonNull
76+
private Consumer<Throwable> retryConsumer;
77+
7178
public IntervalCheck(String name, Function<InstanceId, Mono<Void>> checkFn, Duration interval,
7279
Duration minRetention, Duration maxBackoff) {
7380
this.name = name;
81+
this.retryConsumer = (Throwable throwable) -> log.warn("Unexpected error in {}-check", this.name, throwable);
7482
this.checkFn = checkFn;
7583
this.interval = interval;
7684
this.minRetention = minRetention;
@@ -80,21 +88,29 @@ public IntervalCheck(String name, Function<InstanceId, Mono<Void>> checkFn, Dura
8088
public void start() {
8189
this.scheduler = Schedulers.newSingle(this.name + "-check");
8290
this.subscription = Flux.interval(this.interval)
91+
// ensure the most recent interval tick is always processed, preventing
92+
// lost checks under overload.
93+
.onBackpressureLatest()
8394
.doOnSubscribe((s) -> log.debug("Scheduled {}-check every {}", this.name, this.interval))
84-
.log(log.getName(), Level.FINEST)
85-
.subscribeOn(this.scheduler)
86-
.concatMap((i) -> this.checkAllInstances())
87-
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
88-
.maxBackoff(maxBackoff)
89-
.doBeforeRetry((s) -> log.warn("Unexpected error in {}-check", this.name, s.failure())))
90-
.subscribe(null, (error) -> log.error("Unexpected error in {}-check", name, error));
95+
.log(log.getName(), Level.FINEST) //
96+
.subscribeOn(this.scheduler) //
97+
// Allow concurrent check cycles if previous is slow
98+
.flatMap((i) -> this.checkAllInstances(), Math.max(1, Runtime.getRuntime().availableProcessors() / 2))
99+
.retryWhen(createRetrySpec())
100+
.subscribe(null, (Throwable error) -> log.error("Unexpected error in {}-check", this.name, error));
101+
}
102+
103+
private Retry createRetrySpec() {
104+
return Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
105+
.maxBackoff(maxBackoff)
106+
.doBeforeRetry((s) -> this.retryConsumer.accept(s.failure()));
91107
}
92108

93109
public void markAsChecked(InstanceId instanceId) {
94110
this.lastChecked.put(instanceId, Instant.now());
95111
}
96112

97-
protected Mono<Void> checkAllInstances() {
113+
protected Publisher<Void> checkAllInstances() {
98114
log.debug("check {} for all instances", this.name);
99115
Instant expiration = Instant.now().minus(this.minRetention);
100116
return Flux.fromIterable(this.lastChecked.entrySet())

spring-boot-admin-server/src/test/java/de/codecentric/boot/admin/server/services/IntervalCheckTest.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,28 @@
1717
package de.codecentric.boot.admin.server.services;
1818

1919
import java.time.Duration;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Set;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.CopyOnWriteArrayList;
2025
import java.util.function.Function;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.IntStream;
2128

2229
import org.junit.jupiter.api.AfterEach;
2330
import org.junit.jupiter.api.Test;
31+
import org.mockito.invocation.InvocationOnMock;
2432
import reactor.core.publisher.Mono;
2533

2634
import de.codecentric.boot.admin.server.domain.values.InstanceId;
2735

36+
import static org.assertj.core.api.Assertions.assertThat;
2837
import static org.awaitility.Awaitility.await;
2938
import static org.mockito.ArgumentMatchers.any;
3039
import static org.mockito.Mockito.atLeast;
3140
import static org.mockito.Mockito.atLeastOnce;
41+
import static org.mockito.Mockito.doAnswer;
3242
import static org.mockito.Mockito.mock;
3343
import static org.mockito.Mockito.never;
3444
import static org.mockito.Mockito.verify;
@@ -106,9 +116,133 @@ void should_check_after_error() {
106116
.untilAsserted(() -> verify(this.checkFn, atLeast(2)).apply(InstanceId.of("Test")));
107117
}
108118

119+
@Test
120+
void should_not_overflow_when_checks_timeout_randomly() {
121+
Duration CHECK_INTERVAL = Duration.ofMillis(500);
122+
123+
@SuppressWarnings("unchecked")
124+
Function<InstanceId, Mono<Void>> timeoutCheckFn = mock(Function.class);
125+
126+
java.util.concurrent.atomic.AtomicInteger invocationCount = new java.util.concurrent.atomic.AtomicInteger(0);
127+
doAnswer((invocation) -> {
128+
if (invocationCount.getAndIncrement() % 2 == 0) {
129+
// Succeed quickly on even invocations
130+
return Mono.empty();
131+
}
132+
else {
133+
// Timeout on odd invocations
134+
return Mono.just("slow response").delayElement(CHECK_INTERVAL.plus(Duration.ofSeconds(1))).then();
135+
}
136+
}).when(timeoutCheckFn).apply(any());
137+
138+
IntervalCheck timeoutCheck = new IntervalCheck("overflow-test", timeoutCheckFn, CHECK_INTERVAL, CHECK_INTERVAL,
139+
Duration.ofSeconds(1));
140+
141+
List<Throwable> retryErrors = new CopyOnWriteArrayList<>();
142+
143+
timeoutCheck.setRetryConsumer(retryErrors::add);
144+
timeoutCheck.markAsChecked(INSTANCE_ID);
145+
timeoutCheck.start();
146+
try {
147+
await().atMost(Duration.ofSeconds(5))
148+
.until(() -> retryErrors.stream()
149+
.noneMatch((Throwable er) -> "OverflowException".equalsIgnoreCase(er.getClass().getSimpleName())));
150+
151+
assertThat(retryErrors).noneMatch((Throwable e) -> e.getCause() != null
152+
&& "OverflowException".equalsIgnoreCase(e.getCause().getClass().getSimpleName()));
153+
}
154+
finally {
155+
timeoutCheck.stop();
156+
}
157+
}
158+
159+
@Test
160+
void should_not_lose_checks_under_backpressure() {
161+
Duration CHECK_INTERVAL = Duration.ofMillis(100);
162+
163+
@SuppressWarnings("unchecked")
164+
Function<InstanceId, Mono<Void>> slowCheckFn = mock(Function.class);
165+
IntervalCheck slowCheck = new IntervalCheck("backpressure-test", slowCheckFn, CHECK_INTERVAL,
166+
Duration.ofMillis(50), Duration.ofSeconds(1));
167+
168+
List<Long> checkTimes = new CopyOnWriteArrayList<>();
169+
doAnswer((invocation) -> {
170+
checkTimes.add(System.currentTimeMillis());
171+
return Mono.empty();
172+
}).when(slowCheckFn).apply(any());
173+
174+
slowCheck.markAsChecked(INSTANCE_ID);
175+
slowCheck.start();
176+
177+
try {
178+
await().atMost(Duration.ofSeconds(2)).until(() -> checkTimes.size() >= 5);
179+
// With onBackpressureLatest, we should have processed multiple checks without
180+
// drops
181+
assertThat(checkTimes).hasSizeGreaterThanOrEqualTo(5);
182+
}
183+
finally {
184+
slowCheck.stop();
185+
}
186+
}
187+
188+
@Test
189+
void should_not_lose_checks_under_backpressure_latest() {
190+
Duration CHECK_INTERVAL = Duration.ofMillis(100);
191+
192+
@SuppressWarnings("unchecked")
193+
Function<InstanceId, Mono<Void>> slowCheckFn = mock(Function.class);
194+
195+
IntervalCheck slowCheck = new IntervalCheck("backpressure-test", slowCheckFn, CHECK_INTERVAL, CHECK_INTERVAL,
196+
Duration.ofSeconds(1));
197+
198+
// Add multiple instances to increase load and cause drops
199+
Set<InstanceId> instanceIds = IntStream.range(0, 50)
200+
.mapToObj((i) -> InstanceId.of("Test" + i))
201+
.collect(Collectors.toSet());
202+
203+
instanceIds.forEach((InstanceId instanceId) -> slowCheck.markAsChecked(instanceId));
204+
205+
List<Long> checkTimes = new CopyOnWriteArrayList<>();
206+
Map<String, List<Long>> checkTimesPerInstance = new ConcurrentHashMap<>();
207+
208+
java.util.concurrent.atomic.AtomicInteger invocationCount = new java.util.concurrent.atomic.AtomicInteger(0);
209+
doAnswer((invocation) -> {
210+
long checkTime = System.currentTimeMillis();
211+
String instanceId = instanceIdString(invocation);
212+
List<Long> checkTimesInstance = checkTimesPerInstance.computeIfAbsent(instanceId,
213+
(String k) -> new CopyOnWriteArrayList<>());
214+
checkTimesInstance.add(checkTime);
215+
checkTimes.add(checkTime);
216+
if (invocationCount.getAndIncrement() % 2 == 0) {
217+
// Sometimes succeed quickly
218+
return Mono.empty();
219+
}
220+
else {
221+
// Sometimes slow
222+
return Mono.delay(CHECK_INTERVAL.plus(Duration.ofMillis(500))).then();
223+
}
224+
}).when(slowCheckFn).apply(any());
225+
226+
slowCheck.start();
227+
228+
try {
229+
await().atMost(Duration.ofSeconds(5)).until(() -> checkTimes.size() >= 500);
230+
// With onBackpressureLatest, we should process more checks without drops
231+
instanceIds.forEach((InstanceId instanceId) -> assertThat(checkTimesPerInstance.get(instanceId.getValue()))
232+
.hasSizeGreaterThanOrEqualTo(10));
233+
}
234+
finally {
235+
slowCheck.stop();
236+
}
237+
}
238+
109239
@AfterEach
110240
void tearDown() {
111241
this.intervalCheck.stop();
112242
}
113243

244+
private static String instanceIdString(InvocationOnMock invocation) {
245+
return invocation.getArguments()[0].toString();
246+
}
247+
114248
}

0 commit comments

Comments
 (0)