Skip to content

Commit 1b45fe7

Browse files
committed
Add wait for container stop on cancellation
1 parent 0c0a5d8 commit 1b45fe7

File tree

2 files changed

+32
-0
lines changed

2 files changed

+32
-0
lines changed

src/main/java/org/springframework/data/redis/stream/StreamPollTask.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
* {@link Task} that invokes a {@link BiFunction read function} to poll on a Redis Stream.
4242
*
4343
* @author Mark Paluch
44+
* @author JiHongKim98
4445
* @see 2.2
4546
*/
4647
class StreamPollTask<K, V extends Record<K, ?>> implements Task {
@@ -54,6 +55,8 @@ class StreamPollTask<K, V extends Record<K, ?>> implements Task {
5455
private final PollState pollState;
5556
private final TypeDescriptor targetType;
5657

58+
private final CountDownLatch cancelLatch = new CountDownLatch(1);
59+
5760
private volatile boolean isInEventLoop = false;
5861

5962
StreamPollTask(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener, ErrorHandler errorHandler,
@@ -83,6 +86,12 @@ private static PollState createPollState(StreamReadRequest<?> streamRequest) {
8386
@Override
8487
public void cancel() throws DataAccessResourceFailureException {
8588
this.pollState.cancel();
89+
90+
try {
91+
cancelLatch.await();
92+
} catch (InterruptedException e) {
93+
Thread.currentThread().interrupt();
94+
}
8695
}
8796

8897
@Override
@@ -112,6 +121,7 @@ public void run() {
112121
doLoop();
113122
} finally {
114123
isInEventLoop = false;
124+
cancelLatch.countDown();
115125
}
116126
}
117127

@@ -134,6 +144,7 @@ private void doLoop() {
134144
} catch (RuntimeException ex) {
135145

136146
if (cancelSubscriptionOnError.test(ex)) {
147+
cancelLatch.countDown();
137148
cancel();
138149
}
139150

src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,27 @@ void containerRestartShouldRestartSubscription() throws InterruptedException {
384384
cancelAwait(subscription);
385385
}
386386

387+
@Test // GH-2261
388+
void containerShouldStopGracefully() throws InterruptedException {
389+
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
390+
.create(connectionFactory, containerOptions);
391+
392+
BlockingQueue<MapRecord<String, String, String>> queue = new LinkedBlockingQueue<>();
393+
container.start();
394+
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), r -> {
395+
try {
396+
Thread.sleep(1500);
397+
} catch (InterruptedException e) {
398+
// ignore
399+
}
400+
queue.add(r);
401+
});
402+
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value1"));
403+
subscription.await(DEFAULT_TIMEOUT);
404+
container.stop();
405+
assertThat(queue.poll(500, TimeUnit.MILLISECONDS)).isNotNull();
406+
}
407+
387408
private static void cancelAwait(Subscription subscription) {
388409

389410
subscription.cancel();

0 commit comments

Comments
 (0)