Skip to content

Commit 20447df

Browse files
committed
USe different consumer group for no ack test
1 parent 1df4f92 commit 20447df

File tree

1 file changed

+7
-10
lines changed

1 file changed

+7
-10
lines changed

spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducerTests.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -193,16 +193,13 @@ public void testReadingPendingMessageWithNoAutoACK() {
193193
Address address = new Address("Winterfell, Westeros");
194194
Person person = new Person(address, "John Snow");
195195

196-
this.template.opsForStream()
197-
.createGroup(STREAM_KEY, this.redisStreamMessageProducer.getBeanName())
198-
.as(StepVerifier::create)
199-
.assertNext(message -> assertThat(message).isEqualTo("OK"))
200-
.thenCancel()
201-
.verify(Duration.ofSeconds(10));
196+
String consumerGroup = "testGroup";
197+
String consumerName = "testConsumer";
202198

203199
this.redisStreamMessageProducer.setCreateConsumerGroup(true);
204200
this.redisStreamMessageProducer.setAutoAck(false);
205-
this.redisStreamMessageProducer.setConsumerName(CONSUMER);
201+
this.redisStreamMessageProducer.setConsumerGroup(consumerGroup);
202+
this.redisStreamMessageProducer.setConsumerName(consumerName);
206203
this.redisStreamMessageProducer.setReadOffset(ReadOffset.latest());
207204
this.redisStreamMessageProducer.afterPropertiesSet();
208205
this.redisStreamMessageProducer.start();
@@ -227,14 +224,14 @@ public void testReadingPendingMessageWithNoAutoACK() {
227224

228225
await().until(() ->
229226
template.opsForStream()
230-
.pending(STREAM_KEY, this.redisStreamMessageProducer.getBeanName())
227+
.pending(STREAM_KEY, consumerGroup)
231228
.block(Duration.ofMillis(100))
232229
.getTotalPendingMessages() == 1);
233230

234231
acknowledgmentReference.get().acknowledge();
235232

236-
Mono<PendingMessagesSummary> pendingZeroMessage = template.opsForStream().pending(STREAM_KEY,
237-
this.redisStreamMessageProducer.getBeanName());
233+
Mono<PendingMessagesSummary> pendingZeroMessage =
234+
template.opsForStream().pending(STREAM_KEY, consumerGroup);
238235

239236
StepVerifier.create(pendingZeroMessage)
240237
.assertNext(pendingMessagesSummary ->

0 commit comments

Comments
 (0)