Skip to content

Commit ed3899f

Browse files
authored
Make subscriptionType nullable on @Reactive/PulsarListener (#489)
See #488
1 parent 591fb7d commit ed3899f

File tree

11 files changed

+373
-30
lines changed

11 files changed

+373
-30
lines changed

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar.adoc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,15 @@ NOTE: If you are not using the starter, you will need to configure and register
149149
When it comes to Pulsar consumers, we recommend that end-user applications use the `PulsarListener` annotation.
150150
To use `PulsarListener`, you need to use the `@EnablePulsar` annotation.
151151
When you use Spring Boot support, it automatically enables this annotation and configures all the components necessary for `PulsarListener`, such as the message listener infrastructure (which is responsible for creating the Pulsar consumer).
152-
`PulsarMessageListenerContainer` uses a `PulsarConsumerFactory` to create and manage the Pulsar consumer.
152+
`PulsarMessageListenerContainer` uses a `PulsarConsumerFactory` to create and manage the Pulsar consumer the underlying Pulsar consumer that it uses to consume messages.
153153

154-
Spring Boot auto-configuration also provides this consumer factory which you can further configure by specifying **most** of the {spring-boot-pulsar-config-props}[`spring.pulsar.consumer.*`] application properties.
154+
Spring Boot provides this consumer factory which you can further configure by specifying the {spring-boot-pulsar-config-props}[`spring.pulsar.consumer.*`] application properties.
155+
**Most** of the configured properties on the factory will be respected in the listener with the following **exceptions**:
156+
157+
TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation.
158+
159+
TIP: The `spring.pulsar.consumer.subscription-type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default.
155160

156-
NOTE: `spring.pulsar.consumer.subscription.name` is ignored and is instead generated when not specified on the annotation.
157161

158162
Let us revisit the `PulsarListener` code snippet we saw in the quick-tour section:
159163

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar.adoc

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ Mono<Void> listen(String message) {
159159
}
160160
----
161161

162-
In this most basic form, when the `subscriptionName` is not provided on the `@ReactivePulsarListener` annotation an auto-generated subscription name will be used.
163-
Likewise, when the `topics` are not directly provided, a <<topic-resolution-process-reactive,topic resolution process>> is used to determine the destination topic.
162+
In this most basic form, when the `topics` are not directly provided, a <<topic-resolution-process-reactive,topic resolution process>> is used to determine the destination topic.
163+
Likewise, when the `subscriptionName` is not provided on the `@ReactivePulsarListener` annotation an auto-generated subscription name will be used.
164164

165165
In the `ReactivePulsarListener` method shown earlier, we receive the data as `String`, but we do not specify any schema types.
166166
Internally, the framework relies on Pulsar's schema mechanism to convert the data to the required type.
@@ -238,9 +238,14 @@ Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo
238238
----
239239

240240
==== Configuration - Application Properties
241-
The listener ultimately relies on `ReactivePulsarConsumerFactory` to create and manage the underlying Pulsar consumer.
241+
The listener relies on the `ReactivePulsarConsumerFactory` to create and manage the underlying Pulsar consumer that it uses to consume messages.
242+
Spring Boot provides this consumer factory which you can further configure by specifying the {spring-boot-pulsar-config-props}[`spring.pulsar.consumer.*`] application properties.
243+
**Most** of the configured properties on the factory will be respected in the listener with the following **exceptions**:
244+
245+
TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation.
246+
247+
TIP: The `spring.pulsar.consumer.subscription-type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default.
242248

243-
Spring Boot provides this consumer factory which can be configured with any of the {spring-boot-pulsar-config-props}[`spring.pulsar.consumer.*`] application-properties.
244249

245250
[[reactive-consumer-customizer]]
246251
==== Consumer Customization

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListener.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,12 @@
7272
String subscriptionName() default "";
7373

7474
/**
75-
* Pulsar subscription type for this listener.
76-
* @return the {@code subscriptionType} for this listener
75+
* Pulsar subscription type for this listener - expected to be a single element array
76+
* with subscription type or empty array to indicate null type.
77+
* @return single element array with the subscription type or empty array to indicate
78+
* no type chosen by user
7779
*/
78-
SubscriptionType subscriptionType() default SubscriptionType.Exclusive;
80+
SubscriptionType[] subscriptionType() default { SubscriptionType.Exclusive };
7981

8082
/**
8183
* Pulsar schema type for this listener.

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
153153
});
154154
if (annotatedMethods.isEmpty()) {
155155
this.nonAnnotatedClasses.add(bean.getClass());
156-
this.logger.trace(() -> "No @PulsarListener annotations found on bean type: " + bean.getClass());
156+
this.logger
157+
.trace(() -> "No @ReactivePulsarListener annotations found on bean type: " + bean.getClass());
157158
}
158159
else {
159160
// Non-empty set of methods
@@ -236,7 +237,7 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene
236237
endpoint.setId(getEndpointId(reactivePulsarListener));
237238
endpoint.setTopics(topics);
238239
endpoint.setTopicPattern(topicPattern);
239-
endpoint.setSubscriptionType(reactivePulsarListener.subscriptionType());
240+
resolveSubscriptionType(endpoint, reactivePulsarListener);
240241
endpoint.setSchemaType(reactivePulsarListener.schemaType());
241242

242243
String concurrency = reactivePulsarListener.concurrency();
@@ -260,6 +261,15 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene
260261
resolveConsumerCustomizer(endpoint, reactivePulsarListener);
261262
}
262263

264+
private void resolveSubscriptionType(MethodReactivePulsarListenerEndpoint<?> endpoint,
265+
ReactivePulsarListener reactivePulsarListener) {
266+
Assert.state(reactivePulsarListener.subscriptionType().length <= 1,
267+
() -> "ReactivePulsarListener.subscriptionType must have 0 or 1 elements");
268+
if (reactivePulsarListener.subscriptionType().length == 1) {
269+
endpoint.setSubscriptionType(reactivePulsarListener.subscriptionType()[0]);
270+
}
271+
}
272+
263273
private void resolveDeadLetterPolicy(MethodReactivePulsarListenerEndpoint<?> endpoint,
264274
ReactivePulsarListener reactivePulsarListener) {
265275
Object deadLetterPolicy = resolveExpression(reactivePulsarListener.deadLetterPolicy());

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/ReactivePulsarContainerProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class ReactivePulsarContainerProperties<T> {
4343

4444
private String subscriptionName;
4545

46-
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
46+
private SubscriptionType subscriptionType;
4747

4848
private Schema<T> schema;
4949

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java

Lines changed: 175 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020

2121
import java.nio.charset.StandardCharsets;
2222
import java.time.Duration;
23-
import java.util.Collections;
23+
import java.util.HashMap;
2424
import java.util.List;
25+
import java.util.Map;
2526
import java.util.Objects;
2627
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.CountDownLatch;
@@ -45,10 +46,13 @@
4546
import org.apache.pulsar.common.schema.SchemaType;
4647
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
4748
import org.apache.pulsar.reactive.client.api.MessageResult;
49+
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
50+
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
4851
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
4952
import org.junit.jupiter.api.Nested;
5053
import org.junit.jupiter.api.Test;
5154

55+
import org.springframework.beans.factory.ObjectProvider;
5256
import org.springframework.beans.factory.annotation.Autowired;
5357
import org.springframework.context.annotation.Bean;
5458
import org.springframework.context.annotation.Configuration;
@@ -71,13 +75,19 @@
7175
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
7276
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
7377
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory;
78+
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
7479
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
80+
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig;
7581
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SchemaCustomMappingsTestCases.SchemaCustomMappingsTestConfig.User2;
82+
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig;
83+
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig;
7684
import org.springframework.pulsar.support.PulsarHeaders;
7785
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
7886
import org.springframework.test.annotation.DirtiesContext;
7987
import org.springframework.test.context.ContextConfiguration;
8088
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
89+
import org.springframework.test.util.ReflectionTestUtils;
90+
import org.springframework.util.ObjectUtils;
8191

8292
import reactor.core.publisher.Flux;
8393
import reactor.core.publisher.Mono;
@@ -122,9 +132,14 @@ public PulsarTemplate<String> pulsarTemplate(PulsarProducerFactory<String> pulsa
122132
return new PulsarTemplate<>(pulsarProducerFactory);
123133
}
124134

135+
@SuppressWarnings("unchecked")
125136
@Bean
126-
public ReactivePulsarConsumerFactory<String> pulsarConsumerFactory(ReactivePulsarClient pulsarClient) {
127-
return new DefaultReactivePulsarConsumerFactory<>(pulsarClient, Collections.emptyList());
137+
public ConsumerTrackingReactivePulsarConsumerFactory<String> pulsarConsumerFactory(
138+
ReactivePulsarClient pulsarClient,
139+
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<String>> defaultConsumerCustomizersProvider) {
140+
DefaultReactivePulsarConsumerFactory<String> consumerFactory = new DefaultReactivePulsarConsumerFactory<>(
141+
pulsarClient, defaultConsumerCustomizersProvider.orderedStream().toList());
142+
return new ConsumerTrackingReactivePulsarConsumerFactory<>(consumerFactory);
128143
}
129144

130145
@Bean
@@ -721,7 +736,7 @@ Mono<Void> listenString(String ignored) {
721736
}
722737

723738
@Nested
724-
@ContextConfiguration(classes = ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig.class)
739+
@ContextConfiguration(classes = PulsarListenerWithHeadersConfig.class)
725740
class PulsarHeadersTest {
726741

727742
static CountDownLatch simpleListenerLatch = new CountDownLatch(1);
@@ -894,4 +909,160 @@ Mono<Void> listen2(String message) {
894909

895910
}
896911

912+
@Nested
913+
class SubscriptionTypeTests {
914+
915+
@Nested
916+
@ContextConfiguration(classes = WithDefaultTypeConfig.class)
917+
class WithDefaultType {
918+
919+
static final CountDownLatch latchTypeNotSet = new CountDownLatch(1);
920+
921+
@Test
922+
void whenTypeNotSetAnywhereThenFallbackTypeIsUsed(
923+
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
924+
assertThat(consumerFactory.topicNameToConsumerSpec).hasEntrySatisfying("rpl-typeNotSetAnywhere-topic",
925+
(consumerSpec) -> assertThat(consumerSpec.getSubscriptionType())
926+
.isEqualTo(SubscriptionType.Exclusive));
927+
pulsarTemplate.send("rpl-typeNotSetAnywhere-topic", "hello-rpl-typeNotSetAnywhere");
928+
assertThat(latchTypeNotSet.await(10, TimeUnit.SECONDS)).isTrue();
929+
}
930+
931+
@Configuration(proxyBeanMethods = false)
932+
static class WithDefaultTypeConfig {
933+
934+
@ReactivePulsarListener(topics = "rpl-typeNotSetAnywhere-topic",
935+
subscriptionName = "rpl-typeNotSetAnywhere-sub",
936+
consumerCustomizer = "subscriptionInitialPositionEarliest")
937+
Mono<Void> listenWithoutTypeSetAnywhere(String ignored) {
938+
latchTypeNotSet.countDown();
939+
return Mono.empty();
940+
}
941+
942+
}
943+
944+
}
945+
946+
@Nested
947+
@ContextConfiguration(classes = WithSpecificTypesConfig.class)
948+
class WithSpecificTypes {
949+
950+
static final CountDownLatch latchTypeSetConsumerFactory = new CountDownLatch(1);
951+
952+
static final CountDownLatch latchTypeSetAnnotation = new CountDownLatch(1);
953+
954+
static final CountDownLatch latchWithCustomizer = new CountDownLatch(1);
955+
956+
@Test
957+
void whenTypeSetOnlyInConsumerFactoryThenConsumerFactoryTypeIsUsed(
958+
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
959+
assertThat(consumerFactory.getSpec("rpl-typeSetConsumerFactory-topic"))
960+
.extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
961+
.isEqualTo(SubscriptionType.Shared);
962+
pulsarTemplate.send("rpl-typeSetConsumerFactory-topic", "hello-rpl-typeSetConsumerFactory");
963+
assertThat(latchTypeSetConsumerFactory.await(10, TimeUnit.SECONDS)).isTrue();
964+
}
965+
966+
@Test
967+
void whenTypeSetOnAnnotationThenAnnotationTypeIsUsed(
968+
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
969+
assertThat(consumerFactory.getSpec("rpl-typeSetAnnotation-topic"))
970+
.extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
971+
.isEqualTo(SubscriptionType.Key_Shared);
972+
pulsarTemplate.send("rpl-typeSetAnnotation-topic", "hello-rpl-typeSetAnnotation");
973+
assertThat(latchTypeSetAnnotation.await(10, TimeUnit.SECONDS)).isTrue();
974+
}
975+
976+
@Test
977+
void whenTypeSetWithCustomizerThenCustomizerTypeIsUsed(
978+
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
979+
assertThat(consumerFactory.getSpec("rpl-typeSetCustomizer-topic"))
980+
.extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
981+
.isEqualTo(SubscriptionType.Failover);
982+
pulsarTemplate.send("rpl-typeSetCustomizer-topic", "hello-rpl-typeSetCustomizer");
983+
assertThat(latchWithCustomizer.await(10, TimeUnit.SECONDS)).isTrue();
984+
}
985+
986+
@Configuration(proxyBeanMethods = false)
987+
static class WithSpecificTypesConfig {
988+
989+
@Bean
990+
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubTypeCustomizer() {
991+
return (b) -> b.subscriptionType(SubscriptionType.Shared);
992+
}
993+
994+
@ReactivePulsarListener(topics = "rpl-typeSetConsumerFactory-topic",
995+
subscriptionName = "rpl-typeSetConsumerFactory-sub", subscriptionType = {},
996+
consumerCustomizer = "subscriptionInitialPositionEarliest")
997+
Mono<Void> listenWithTypeSetOnlyOnConsumerFactory(String ignored) {
998+
latchTypeSetConsumerFactory.countDown();
999+
return Mono.empty();
1000+
}
1001+
1002+
@ReactivePulsarListener(topics = "rpl-typeSetAnnotation-topic",
1003+
subscriptionName = "rpl-typeSetAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared,
1004+
consumerCustomizer = "subscriptionInitialPositionEarliest")
1005+
Mono<Void> listenWithTypeSetOnAnnotation(String ignored) {
1006+
latchTypeSetAnnotation.countDown();
1007+
return Mono.empty();
1008+
}
1009+
1010+
@ReactivePulsarListener(topics = "rpl-typeSetCustomizer-topic",
1011+
subscriptionName = "rpl-typeSetCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared,
1012+
consumerCustomizer = "myCustomizer")
1013+
Mono<Void> listenWithTypeSetInCustomizer(String ignored) {
1014+
latchWithCustomizer.countDown();
1015+
return Mono.empty();
1016+
}
1017+
1018+
@Bean
1019+
public ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myCustomizer() {
1020+
return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
1021+
.subscriptionType(SubscriptionType.Failover);
1022+
}
1023+
1024+
}
1025+
1026+
}
1027+
1028+
}
1029+
1030+
static class ConsumerTrackingReactivePulsarConsumerFactory<T> implements ReactivePulsarConsumerFactory<T> {
1031+
1032+
private Map<String, ReactiveMessageConsumerSpec> topicNameToConsumerSpec = new HashMap<>();
1033+
1034+
private ReactivePulsarConsumerFactory<T> delegate;
1035+
1036+
ConsumerTrackingReactivePulsarConsumerFactory(ReactivePulsarConsumerFactory<T> delegate) {
1037+
this.delegate = delegate;
1038+
}
1039+
1040+
@Override
1041+
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema) {
1042+
var consumer = this.delegate.createConsumer(schema);
1043+
storeSpec(consumer);
1044+
return consumer;
1045+
}
1046+
1047+
@Override
1048+
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
1049+
List<ReactiveMessageConsumerBuilderCustomizer<T>> reactiveMessageConsumerBuilderCustomizers) {
1050+
var consumer = this.delegate.createConsumer(schema, reactiveMessageConsumerBuilderCustomizers);
1051+
storeSpec(consumer);
1052+
return consumer;
1053+
}
1054+
1055+
private void storeSpec(ReactiveMessageConsumer<T> consumer) {
1056+
var consumerSpec = (ReactiveMessageConsumerSpec) ReflectionTestUtils.getField(consumer, "consumerSpec");
1057+
var topicNamesKey = !ObjectUtils.isEmpty(consumerSpec.getTopicNames()) ? consumerSpec.getTopicNames().get(0)
1058+
: "no-topics-set";
1059+
this.topicNameToConsumerSpec.put(topicNamesKey, consumerSpec);
1060+
}
1061+
1062+
ReactiveMessageConsumerSpec getSpec(String topic) {
1063+
return this.topicNameToConsumerSpec.get(topic);
1064+
}
1065+
1066+
}
1067+
8971068
}

spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListener.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,12 @@
7474
String subscriptionName() default "";
7575

7676
/**
77-
* Pulsar subscription type for this listener.
78-
* @return the {@code subscriptionType} for this listener
77+
* Pulsar subscription type for this listener - expected to be a single element array
78+
* with subscription type or empty array to indicate null type.
79+
* @return single element array with the subscription type or empty array to indicate
80+
* no type chosen by user
7981
*/
80-
SubscriptionType subscriptionType() default SubscriptionType.Exclusive;
82+
SubscriptionType[] subscriptionType() default { SubscriptionType.Exclusive };
8183

8284
/**
8385
* Pulsar schema type for this listener.
@@ -114,7 +116,7 @@
114116
* a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to
115117
* obtain the value.
116118
* <p>
117-
* SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
119+
* SpEL {@code #{...}} and property placeholders {@code ${...}} are supported.
118120
* @return true to auto start, false to not auto start.
119121
*/
120122
String autoStartup() default "";

spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListenerAnnotationBeanPostProcessor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ private void processPulsarListenerAnnotation(MethodPulsarListenerEndpoint<?> end
226226
endpoint.setId(getEndpointId(pulsarListener));
227227
endpoint.setTopics(topics);
228228
endpoint.setTopicPattern(topicPattern);
229-
endpoint.setSubscriptionType(pulsarListener.subscriptionType());
229+
resolveSubscriptionType(endpoint, pulsarListener);
230230
endpoint.setSchemaType(pulsarListener.schemaType());
231231
endpoint.setAckMode(pulsarListener.ackMode());
232232

@@ -250,6 +250,14 @@ private void processPulsarListenerAnnotation(MethodPulsarListenerEndpoint<?> end
250250
resolveConsumerCustomizer(endpoint, pulsarListener);
251251
}
252252

253+
private void resolveSubscriptionType(MethodPulsarListenerEndpoint<?> endpoint, PulsarListener pulsarListener) {
254+
Assert.state(pulsarListener.subscriptionType().length <= 1,
255+
() -> "PulsarListener.subscriptionType must have 0 or 1 elements");
256+
if (pulsarListener.subscriptionType().length == 1) {
257+
endpoint.setSubscriptionType(pulsarListener.subscriptionType()[0]);
258+
}
259+
}
260+
253261
@SuppressWarnings({ "rawtypes" })
254262
private void resolvePulsarConsumerErrorHandler(MethodPulsarListenerEndpoint<?> endpoint,
255263
PulsarListener pulsarListener) {

0 commit comments

Comments
 (0)