diff --git a/sdk/spring/CHANGELOG.md b/sdk/spring/CHANGELOG.md index 4960782d5487..34db5765146c 100644 --- a/sdk/spring/CHANGELOG.md +++ b/sdk/spring/CHANGELOG.md @@ -10,6 +10,27 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module. - Add ConnectionDetails for ServiceBus. [#44019](https://github.com/Azure/azure-sdk-for-java/pull/44019). +#### Breaking Changes + +- Change sender's default JmsConnectionFactory from ServiceBusJmsConnectionFactory to CachingConnectionFactory. [#47923](https://github.com/Azure/azure-sdk-for-java/issues/47923) + +The ConnectionFactory type is determined by the following configuration properties: + + | `spring.jms.servicebus.pool.enabled` | `spring.jms.cache.enabled` | Sender ConnectionFactory | Receiver ConnectionFactory | + |--------------------------------------|----------------------------|--------------------------------|--------------------------------| + | not set | not set | CachingConnectionFactory | ServiceBusJmsConnectionFactory | + | not set | true | CachingConnectionFactory | CachingConnectionFactory | + | not set | false | ServiceBusJmsConnectionFactory | ServiceBusJmsConnectionFactory | + | true | not set | JmsPoolConnectionFactory | JmsPoolConnectionFactory | + | true | true | CachingConnectionFactory | CachingConnectionFactory | + | true | false | JmsPoolConnectionFactory | JmsPoolConnectionFactory | + | false | not set | CachingConnectionFactory | ServiceBusJmsConnectionFactory | + | false | true | CachingConnectionFactory | CachingConnectionFactory | + | false | false | ServiceBusJmsConnectionFactory | ServiceBusJmsConnectionFactory | + + **Note:** `CachingConnectionFactory` and `JmsPoolConnectionFactory` will be used only when they exist in classpath. + + ### Spring Cloud Azure Docker Compose This section includes changes in `spring-cloud-azure-docker-compose` module. diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsConnectionFactoryConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsConnectionFactoryConfiguration.java index c48fb2de2f72..539acfb9d416 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsConnectionFactoryConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsConnectionFactoryConfiguration.java @@ -14,10 +14,10 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionRegistry; -import org.springframework.boot.jms.autoconfigure.JmsPoolConnectionFactoryFactory; -import org.springframework.boot.jms.autoconfigure.JmsProperties; import org.springframework.boot.context.properties.bind.BindResult; import org.springframework.boot.context.properties.bind.Binder; +import org.springframework.boot.jms.autoconfigure.JmsPoolConnectionFactoryFactory; +import org.springframework.boot.jms.autoconfigure.JmsProperties; import org.springframework.context.EnvironmentAware; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; @@ -37,6 +37,58 @@ */ @Import(ServiceBusJmsConnectionFactoryConfiguration.Registrar.class) class ServiceBusJmsConnectionFactoryConfiguration { + static final int NOT_CONFIGURED = 0; + static final int TRUE = 1; + static final int FALSE = 2; + static final int POOL = 0; + static final int CACHE = 1; + static final int SERVICE_BUS = 2; + + /** + * Creates a ServiceBusJmsConnectionFactory using the provided properties and customizers. + * This is a shared helper method used by both sender and receiver configurations. + * + * @param properties the Azure Service Bus JMS properties + * @param customizers the list of customizers to apply + * @return a configured ServiceBusJmsConnectionFactory instance + */ + static ServiceBusJmsConnectionFactory createServiceBusJmsConnectionFactory( + AzureServiceBusJmsProperties properties, + java.util.List customizers) { + return new ServiceBusJmsConnectionFactoryFactory(properties, customizers) + .createConnectionFactory(ServiceBusJmsConnectionFactory.class); + } + + /** + * Registers the appropriate ConnectionFactory bean based on configuration properties. + *

+ * The ConnectionFactory type is determined by the following table: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
spring.jms.servicebus.pool.enabledspring.jms.cache.enabledSender ConnectionFactory
not setnot setCachingConnectionFactory
not settrueCachingConnectionFactory
not setfalseServiceBusJmsConnectionFactory
truenot setJmsPoolConnectionFactory
truetrueCachingConnectionFactory
truefalseJmsPoolConnectionFactory
falsenot setCachingConnectionFactory
falsetrueCachingConnectionFactory
falsefalseServiceBusJmsConnectionFactory
+ *

+ */ + private static final int[][] DECISION_TABLE = { + // pool: not set + {CACHE, CACHE, SERVICE_BUS}, // cache: not set, true, false + // pool: true + {POOL, CACHE, POOL}, // cache: not set, true, false + // pool: false + {CACHE, CACHE, SERVICE_BUS} // cache: not set, true, false + }; static class Registrar implements BeanFactoryAware, EnvironmentAware, ImportBeanDefinitionRegistrar { @@ -61,17 +113,44 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BindResult poolEnabledResult = Binder.get(environment).bind("spring.jms.servicebus.pool.enabled", Boolean.class); BindResult cacheEnabledResult = Binder.get(environment).bind("spring.jms.cache.enabled", Boolean.class); - if (isPoolConnectionFactoryClassPresent() && poolEnabledResult.orElseGet(() -> false)) { - registerJmsPoolConnectionFactory(registry); - return; + switch (getFactoryType(poolEnabledResult, cacheEnabledResult, DECISION_TABLE)) { + case POOL: + registerJmsPoolConnectionFactory(registry); + break; + case CACHE: + registerJmsCachingConnectionFactory(registry); + break; + default: + registerServiceBusJmsConnectionFactory(registry); } + } - if (isCacheConnectionFactoryClassPresent() && cacheEnabledResult.orElseGet(() -> false)) { - registerJmsCachingConnectionFactory(registry); - return; + static int getFactoryType(BindResult poolEnabledResult, BindResult cacheEnabledResult, int[][] decisionTable) { + int poolIndex = NOT_CONFIGURED; + if (poolEnabledResult.isBound()) { + poolIndex = poolEnabledResult.get() ? TRUE : FALSE; + } + int cacheIndex = NOT_CONFIGURED; + if (cacheEnabledResult.isBound()) { + cacheIndex = cacheEnabledResult.get() ? TRUE : FALSE; + } + int configuredFactoryType = decisionTable[poolIndex][cacheIndex]; + switch (configuredFactoryType) { + case POOL: + if (isPoolConnectionFactoryClassPresent()) { + return POOL; + } else { + return SERVICE_BUS; + } + case CACHE: + if (isCacheConnectionFactoryClassPresent()) { + return CACHE; + } else { + return SERVICE_BUS; + } + default: + return SERVICE_BUS; } - - registerServiceBusJmsConnectionFactory(registry); } private static boolean isCacheConnectionFactoryClassPresent() { @@ -117,9 +196,9 @@ private void registerJmsPoolConnectionFactory(BeanDefinitionRegistry registry) { private ServiceBusJmsConnectionFactory createServiceBusJmsConnectionFactory() { AzureServiceBusJmsProperties serviceBusJmsProperties = beanFactory.getBean(AzureServiceBusJmsProperties.class); ObjectProvider factoryCustomizers = beanFactory.getBeanProvider(AzureServiceBusJmsConnectionFactoryCustomizer.class); - return new ServiceBusJmsConnectionFactoryFactory(serviceBusJmsProperties, - factoryCustomizers.orderedStream().collect(Collectors.toList())) - .createConnectionFactory(ServiceBusJmsConnectionFactory.class); + return ServiceBusJmsConnectionFactoryConfiguration.createServiceBusJmsConnectionFactory( + serviceBusJmsProperties, + factoryCustomizers.orderedStream().collect(Collectors.toList())); } } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsContainerConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsContainerConfiguration.java index 82bff9332874..5fba2b174c34 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsContainerConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsContainerConfiguration.java @@ -3,25 +3,88 @@ package com.azure.spring.cloud.autoconfigure.implementation.jms; +import com.azure.servicebus.jms.ServiceBusJmsConnectionFactory; import com.azure.spring.cloud.autoconfigure.implementation.jms.properties.AzureServiceBusJmsProperties; +import com.azure.spring.cloud.autoconfigure.jms.AzureServiceBusJmsConnectionFactoryCustomizer; import jakarta.jms.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.bind.BindResult; +import org.springframework.boot.context.properties.bind.Binder; import org.springframework.boot.jms.autoconfigure.DefaultJmsListenerContainerFactoryConfigurer; +import org.springframework.boot.jms.autoconfigure.JmsProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; +import java.util.stream.Collectors; + +import static com.azure.spring.cloud.autoconfigure.implementation.jms.ServiceBusJmsConnectionFactoryConfiguration.CACHE; +import static com.azure.spring.cloud.autoconfigure.implementation.jms.ServiceBusJmsConnectionFactoryConfiguration.POOL; +import static com.azure.spring.cloud.autoconfigure.implementation.jms.ServiceBusJmsConnectionFactoryConfiguration.SERVICE_BUS; +import static com.azure.spring.cloud.autoconfigure.implementation.jms.ServiceBusJmsConnectionFactoryConfiguration.Registrar.getFactoryType; + @Configuration(proxyBeanMethods = false) @ConditionalOnClass(EnableJms.class) -class ServiceBusJmsContainerConfiguration { +class ServiceBusJmsContainerConfiguration implements DisposableBean { + + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusJmsContainerConfiguration.class); + + /** + *

+ * The ConnectionFactory type is determined by the following table: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
spring.jms.servicebus.pool.enabledspring.jms.cache.enabledReceiver ConnectionFactory
not setnot setServiceBusJmsConnectionFactory
not settrueCachingConnectionFactory
not setfalseServiceBusJmsConnectionFactory
truenot setJmsPoolConnectionFactory
truetrueCachingConnectionFactory
truefalseJmsPoolConnectionFactory
falsenot setServiceBusJmsConnectionFactory
falsetrueCachingConnectionFactory
falsefalseServiceBusJmsConnectionFactory
+ */ + private static final int[][] DECISION_TABLE = { + // pool: not set + {SERVICE_BUS, CACHE, SERVICE_BUS}, // cache: not set, true, false + // pool: true + {POOL, CACHE, POOL}, // cache: not set, true, false + // pool: false + {SERVICE_BUS, CACHE, SERVICE_BUS} // cache: not set, true, false + }; private final AzureServiceBusJmsProperties azureServiceBusJMSProperties; + private final ObjectProvider factoryCustomizers; + private final Environment environment; + private final JmsProperties jmsProperties; + + // Memoized dedicated receiver ConnectionFactory instances to avoid duplicates and enable lifecycle management + // Use ConnectionFactory type instead of concrete types to avoid NoClassDefFoundError when optional dependencies are missing + private volatile ConnectionFactory dedicatedCachingConnectionFactory; + private volatile ConnectionFactory dedicatedPoolConnectionFactory; + private volatile ServiceBusJmsConnectionFactory dedicatedServiceBusConnectionFactory; - ServiceBusJmsContainerConfiguration(AzureServiceBusJmsProperties azureServiceBusJMSProperties) { + ServiceBusJmsContainerConfiguration(AzureServiceBusJmsProperties azureServiceBusJMSProperties, + ObjectProvider factoryCustomizers, + Environment environment, + JmsProperties jmsProperties) { this.azureServiceBusJMSProperties = azureServiceBusJMSProperties; + this.factoryCustomizers = factoryCustomizers; + this.environment = environment; + this.jmsProperties = jmsProperties; } @Bean @@ -29,7 +92,9 @@ class ServiceBusJmsContainerConfiguration { JmsListenerContainerFactory jmsListenerContainerFactory( DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory(); - configurer.configure(jmsListenerContainerFactory, connectionFactory); + // Use the bean ConnectionFactory if it's pooled or cached, otherwise create a dedicated one for receiver + ConnectionFactory receiverConnectionFactory = getReceiverConnectionFactory(connectionFactory); + configurer.configure(jmsListenerContainerFactory, receiverConnectionFactory); jmsListenerContainerFactory.setPubSubDomain(Boolean.FALSE); configureCommonListenerContainerFactory(jmsListenerContainerFactory); return jmsListenerContainerFactory; @@ -40,13 +105,103 @@ JmsListenerContainerFactory jmsListenerContainerFactory( JmsListenerContainerFactory topicJmsListenerContainerFactory( DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory(); - configurer.configure(jmsListenerContainerFactory, connectionFactory); + // Use the bean ConnectionFactory if it's pooled or cached, otherwise create a dedicated one for receiver + ConnectionFactory receiverConnectionFactory = getReceiverConnectionFactory(connectionFactory); + configurer.configure(jmsListenerContainerFactory, receiverConnectionFactory); jmsListenerContainerFactory.setPubSubDomain(Boolean.TRUE); configureCommonListenerContainerFactory(jmsListenerContainerFactory); configureTopicListenerContainerFactory(jmsListenerContainerFactory); return jmsListenerContainerFactory; } + /** + * Determines the appropriate ConnectionFactory for JMS listener containers based on configuration properties. + * + * @param connectionFactory the ConnectionFactory bean registered by {@link ServiceBusJmsConnectionFactoryConfiguration} + * @return the ConnectionFactory to use for the receiver + */ + private ConnectionFactory getReceiverConnectionFactory(ConnectionFactory connectionFactory) { + BindResult poolEnabledResult = Binder.get(environment).bind("spring.jms.servicebus.pool.enabled", Boolean.class); + BindResult cacheEnabledResult = Binder.get(environment).bind("spring.jms.cache.enabled", Boolean.class); + + switch (getFactoryType(poolEnabledResult, cacheEnabledResult, DECISION_TABLE)) { + case POOL: + if (isJmsPoolConnectionFactory(connectionFactory)) { + return connectionFactory; + } else { + return getOrCreateDedicatedPoolConnectionFactory(); + } + case CACHE: + if (isCachingConnectionFactory(connectionFactory)) { + return connectionFactory; + } else { + return getOrCreateDedicatedCachingConnectionFactory(); + } + default: + return getOrCreateDedicatedServiceBusConnectionFactory(); + } + } + + private boolean isJmsPoolConnectionFactory(ConnectionFactory connectionFactory) { + return "org.messaginghub.pooled.jms.JmsPoolConnectionFactory".equals(connectionFactory.getClass().getName()); + } + + private boolean isCachingConnectionFactory(ConnectionFactory connectionFactory) { + return "org.springframework.jms.connection.CachingConnectionFactory".equals(connectionFactory.getClass().getName()); + } + + private synchronized ConnectionFactory getOrCreateDedicatedPoolConnectionFactory() { + if (dedicatedPoolConnectionFactory == null) { + try { + // Use reflection to create JmsPoolConnectionFactory to avoid hard dependency + Class poolClass = Class.forName("org.springframework.boot.jms.autoconfigure.JmsPoolConnectionFactoryProperties"); + Class factoryClass = Class.forName("org.springframework.boot.jms.autoconfigure.JmsPoolConnectionFactoryFactory"); + Object factoryInstance = factoryClass.getConstructor(poolClass) + .newInstance(azureServiceBusJMSProperties.getPool()); + dedicatedPoolConnectionFactory = (ConnectionFactory) factoryClass + .getMethod("createPooledConnectionFactory", ConnectionFactory.class) + .invoke(factoryInstance, createServiceBusJmsConnectionFactory()); + } catch (Exception e) { + throw new IllegalStateException("Failed to create JmsPoolConnectionFactory", e); + } + } + return dedicatedPoolConnectionFactory; + } + + private synchronized ConnectionFactory getOrCreateDedicatedCachingConnectionFactory() { + if (dedicatedCachingConnectionFactory == null) { + try { + // Use reflection to create CachingConnectionFactory to avoid hard dependency + Class cachingClass = Class.forName("org.springframework.jms.connection.CachingConnectionFactory"); + Object cacheFactory = cachingClass.getConstructor(ConnectionFactory.class) + .newInstance(createServiceBusJmsConnectionFactory()); + + JmsProperties.Cache cacheProperties = jmsProperties.getCache(); + cachingClass.getMethod("setCacheConsumers", boolean.class).invoke(cacheFactory, cacheProperties.isConsumers()); + cachingClass.getMethod("setCacheProducers", boolean.class).invoke(cacheFactory, cacheProperties.isProducers()); + cachingClass.getMethod("setSessionCacheSize", int.class).invoke(cacheFactory, cacheProperties.getSessionCacheSize()); + + dedicatedCachingConnectionFactory = (ConnectionFactory) cacheFactory; + } catch (Exception e) { + throw new IllegalStateException("Failed to create CachingConnectionFactory", e); + } + } + return dedicatedCachingConnectionFactory; + } + + private synchronized ServiceBusJmsConnectionFactory getOrCreateDedicatedServiceBusConnectionFactory() { + if (dedicatedServiceBusConnectionFactory == null) { + dedicatedServiceBusConnectionFactory = createServiceBusJmsConnectionFactory(); + } + return dedicatedServiceBusConnectionFactory; + } + + private ServiceBusJmsConnectionFactory createServiceBusJmsConnectionFactory() { + return ServiceBusJmsConnectionFactoryConfiguration.createServiceBusJmsConnectionFactory( + azureServiceBusJMSProperties, + factoryCustomizers.orderedStream().collect(Collectors.toList())); + } + private void configureCommonListenerContainerFactory(DefaultJmsListenerContainerFactory jmsListenerContainerFactory) { AzureServiceBusJmsProperties.Listener listener = azureServiceBusJMSProperties.getListener(); if (listener.getReplyQosSettings() != null) { @@ -69,4 +224,29 @@ private void configureTopicListenerContainerFactory(DefaultJmsListenerContainerF jmsListenerContainerFactory.setSubscriptionShared(listener.isSubscriptionShared()); } } + + @Override + public void destroy() throws Exception { + // Close dedicated ConnectionFactory instances to prevent resource leaks + // Use class name checks to avoid NoClassDefFoundError when optional dependencies are missing + if (dedicatedPoolConnectionFactory != null + && "org.messaginghub.pooled.jms.JmsPoolConnectionFactory".equals(dedicatedPoolConnectionFactory.getClass().getName())) { + try { + // Use reflection to call stop() to avoid hard dependency + dedicatedPoolConnectionFactory.getClass().getMethod("stop").invoke(dedicatedPoolConnectionFactory); + } catch (Exception e) { + LOGGER.warn("Failed to stop JmsPoolConnectionFactory during shutdown", e); + } + } + if (dedicatedCachingConnectionFactory != null + && "org.springframework.jms.connection.CachingConnectionFactory".equals(dedicatedCachingConnectionFactory.getClass().getName())) { + try { + // Use reflection to call destroy() to avoid hard dependency + dedicatedCachingConnectionFactory.getClass().getMethod("destroy").invoke(dedicatedCachingConnectionFactory); + } catch (Exception e) { + LOGGER.warn("Failed to destroy CachingConnectionFactory during shutdown", e); + } + } + // ServiceBusJmsConnectionFactory doesn't have a close method, so no cleanup needed + } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsAutoConfigurationTests.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsAutoConfigurationTests.java index efc13e46a700..0af016c748bd 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsAutoConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsAutoConfigurationTests.java @@ -287,14 +287,14 @@ void jmsListenerContainerFactoryConfiguredCorrectly(String pricingTier) { @ParameterizedTest @ValueSource(strings = {"standard", "premium"}) - void nativeConnectionFactoryBeanConfiguredByDefaultInJmsListenerContainerFactory(String pricingTier) { + void cachingConnectionFactoryBeanConfiguredByDefault(String pricingTier) { this.contextRunner .withPropertyValues( "spring.jms.servicebus.pricing-tier=" + pricingTier, "spring.jms.servicebus.connection-string=" + CONNECTION_STRING) .run(context -> { - assertThat(context).hasSingleBean(ServiceBusJmsConnectionFactory.class); - assertThat(context).doesNotHaveBean(CachingConnectionFactory.class); + assertThat(context).hasSingleBean(CachingConnectionFactory.class); + assertThat(context).doesNotHaveBean(ServiceBusJmsConnectionFactory.class); assertThat(context).doesNotHaveBean(JmsPoolConnectionFactory.class); }); } @@ -318,7 +318,7 @@ void jmsPoolConnectionFactoryBeanConfiguredExplicitly(String pricingTier) { @ParameterizedTest @ValueSource(strings = {"standard", "premium"}) - void jmsPoolConnectionFactoryBeanConfiguredByPoolEnableCacheEnable(String pricingTier) { + void cachingConnectionFactoryBeanConfiguredByPoolEnableCacheEnable(String pricingTier) { this.contextRunner .withPropertyValues( "spring.jms.servicebus.pricing-tier=" + pricingTier, @@ -327,9 +327,9 @@ void jmsPoolConnectionFactoryBeanConfiguredByPoolEnableCacheEnable(String pricin "spring.jms.cache.enabled=true" ) .run(context -> { - assertThat(context).hasSingleBean(JmsPoolConnectionFactory.class); + assertThat(context).hasSingleBean(CachingConnectionFactory.class); assertThat(context).doesNotHaveBean(ServiceBusJmsConnectionFactory.class); - assertThat(context).doesNotHaveBean(CachingConnectionFactory.class); + assertThat(context).doesNotHaveBean(JmsPoolConnectionFactory.class); } ); } @@ -387,7 +387,7 @@ void cachingConnectionFactoryBeanConfiguredByPoolDisableCacheEnable(String prici @ParameterizedTest @ValueSource(strings = {"standard", "premium"}) - void nativeConnectionFactoryBeanConfiguredByPoolDisable(String pricingTier) { + void cachingConnectionFactoryBeanConfiguredByPoolDisable(String pricingTier) { this.contextRunner .withPropertyValues( "spring.jms.servicebus.pricing-tier=" + pricingTier, @@ -395,8 +395,8 @@ void nativeConnectionFactoryBeanConfiguredByPoolDisable(String pricingTier) { "spring.jms.servicebus.pool.enabled=false" ) .run(context -> { - assertThat(context).hasSingleBean(ServiceBusJmsConnectionFactory.class); - assertThat(context).doesNotHaveBean(CachingConnectionFactory.class); + assertThat(context).hasSingleBean(CachingConnectionFactory.class); + assertThat(context).doesNotHaveBean(ServiceBusJmsConnectionFactory.class); assertThat(context).doesNotHaveBean(JmsPoolConnectionFactory.class); }); } @@ -435,4 +435,128 @@ void nativeConnectionFactoryBeanConfiguredByPoolDisableCacheDisable(String prici } ); } + + // Tests for receiver ConnectionFactory type verification + + @ParameterizedTest + @ValueSource(strings = {"standard", "premium"}) + void receiverUsesDedicatedServiceBusConnectionFactoryByDefault(String pricingTier) { + this.contextRunner + .withPropertyValues( + "spring.jms.servicebus.pricing-tier=" + pricingTier, + "spring.jms.servicebus.connection-string=" + CONNECTION_STRING + ) + .run(context -> { + // Sender bean is CachingConnectionFactory + CachingConnectionFactory senderBean = context.getBean(CachingConnectionFactory.class); + assertThat(senderBean).isNotNull(); + + // Get the listener factories and create containers to verify ConnectionFactory + DefaultJmsListenerContainerFactory queueFactory = context.getBean("jmsListenerContainerFactory", DefaultJmsListenerContainerFactory.class); + DefaultJmsListenerContainerFactory topicFactory = context.getBean("topicJmsListenerContainerFactory", DefaultJmsListenerContainerFactory.class); + + // Create containers to access their ConnectionFactory + DefaultMessageListenerContainer queueContainer = queueFactory.createListenerContainer(mock(JmsListenerEndpoint.class)); + DefaultMessageListenerContainer topicContainer = topicFactory.createListenerContainer(mock(JmsListenerEndpoint.class)); + + // Verify receiver uses dedicated ServiceBusJmsConnectionFactory, NOT the sender's CachingConnectionFactory bean + assertThat(queueContainer.getConnectionFactory()) + .isInstanceOf(ServiceBusJmsConnectionFactory.class) + .isNotSameAs(senderBean); + assertThat(topicContainer.getConnectionFactory()) + .isInstanceOf(ServiceBusJmsConnectionFactory.class) + .isNotSameAs(senderBean); + + // Verify both containers share the same dedicated instance (memoized) + assertThat(queueContainer.getConnectionFactory()).isSameAs(topicContainer.getConnectionFactory()); + }); + } + + @ParameterizedTest + @ValueSource(strings = {"standard", "premium"}) + void receiverUsesCachingConnectionFactoryWhenExplicitlyEnabled(String pricingTier) { + this.contextRunner + .withPropertyValues( + "spring.jms.servicebus.pricing-tier=" + pricingTier, + "spring.jms.servicebus.connection-string=" + CONNECTION_STRING, + "spring.jms.cache.enabled=true" + ) + .run(context -> { + // Both sender and receiver use CachingConnectionFactory bean + CachingConnectionFactory sharedBean = context.getBean(CachingConnectionFactory.class); + assertThat(sharedBean).isNotNull(); + + // Get the listener factories and create containers + DefaultJmsListenerContainerFactory queueFactory = context.getBean("jmsListenerContainerFactory", DefaultJmsListenerContainerFactory.class); + DefaultJmsListenerContainerFactory topicFactory = context.getBean("topicJmsListenerContainerFactory", DefaultJmsListenerContainerFactory.class); + + DefaultMessageListenerContainer queueContainer = queueFactory.createListenerContainer(mock(JmsListenerEndpoint.class)); + DefaultMessageListenerContainer topicContainer = topicFactory.createListenerContainer(mock(JmsListenerEndpoint.class)); + + // Verify receiver uses the same CachingConnectionFactory bean as sender + assertThat(queueContainer.getConnectionFactory()).isSameAs(sharedBean); + assertThat(topicContainer.getConnectionFactory()).isSameAs(sharedBean); + }); + } + + @ParameterizedTest + @ValueSource(strings = {"standard", "premium"}) + void receiverUsesPoolConnectionFactoryWhenExplicitlyEnabled(String pricingTier) { + this.contextRunner + .withPropertyValues( + "spring.jms.servicebus.pricing-tier=" + pricingTier, + "spring.jms.servicebus.connection-string=" + CONNECTION_STRING, + "spring.jms.servicebus.pool.enabled=true" + ) + .run(context -> { + // Both sender and receiver use JmsPoolConnectionFactory bean + JmsPoolConnectionFactory sharedBean = context.getBean(JmsPoolConnectionFactory.class); + assertThat(sharedBean).isNotNull(); + + // Get the listener factories and create containers + DefaultJmsListenerContainerFactory queueFactory = context.getBean("jmsListenerContainerFactory", DefaultJmsListenerContainerFactory.class); + DefaultJmsListenerContainerFactory topicFactory = context.getBean("topicJmsListenerContainerFactory", DefaultJmsListenerContainerFactory.class); + + DefaultMessageListenerContainer queueContainer = queueFactory.createListenerContainer(mock(JmsListenerEndpoint.class)); + DefaultMessageListenerContainer topicContainer = topicFactory.createListenerContainer(mock(JmsListenerEndpoint.class)); + + // Verify receiver uses the same JmsPoolConnectionFactory bean as sender + assertThat(queueContainer.getConnectionFactory()).isSameAs(sharedBean); + assertThat(topicContainer.getConnectionFactory()).isSameAs(sharedBean); + }); + } + + @ParameterizedTest + @ValueSource(strings = {"standard", "premium"}) + void receiverUsesDedicatedServiceBusConnectionFactoryWhenPoolDisabled(String pricingTier) { + this.contextRunner + .withPropertyValues( + "spring.jms.servicebus.pricing-tier=" + pricingTier, + "spring.jms.servicebus.connection-string=" + CONNECTION_STRING, + "spring.jms.servicebus.pool.enabled=false" + ) + .run(context -> { + // Sender uses CachingConnectionFactory bean + CachingConnectionFactory senderBean = context.getBean(CachingConnectionFactory.class); + assertThat(senderBean).isNotNull(); + + // Get the listener factories and create containers + DefaultJmsListenerContainerFactory queueFactory = context.getBean("jmsListenerContainerFactory", DefaultJmsListenerContainerFactory.class); + DefaultJmsListenerContainerFactory topicFactory = context.getBean("topicJmsListenerContainerFactory", DefaultJmsListenerContainerFactory.class); + + DefaultMessageListenerContainer queueContainer = queueFactory.createListenerContainer(mock(JmsListenerEndpoint.class)); + DefaultMessageListenerContainer topicContainer = topicFactory.createListenerContainer(mock(JmsListenerEndpoint.class)); + + // Receiver creates dedicated ServiceBusJmsConnectionFactory, NOT the sender's bean + assertThat(queueContainer.getConnectionFactory()) + .isInstanceOf(ServiceBusJmsConnectionFactory.class) + .isNotSameAs(senderBean); + assertThat(topicContainer.getConnectionFactory()) + .isInstanceOf(ServiceBusJmsConnectionFactory.class) + .isNotSameAs(senderBean); + + // Verify both containers share the same dedicated instance (memoized) + assertThat(queueContainer.getConnectionFactory()).isSameAs(topicContainer.getConnectionFactory()); + }); + } } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsConnectionFactoryConfigurationTests.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsConnectionFactoryConfigurationTests.java index 8ac71f7c9e03..b31906dba3f7 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsConnectionFactoryConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsConnectionFactoryConfigurationTests.java @@ -43,17 +43,28 @@ class ServiceBusJmsConnectionFactoryConfigurationTests { @ParameterizedTest @ValueSource(strings = { "standard", "premium" }) - void useDefaultNativeConnection(String pricingTier) { + void useDefaultCachingConnection(String pricingTier) { this.contextRunner .withPropertyValues( "spring.jms.servicebus.pricing-tier=" + pricingTier ) + .run(context -> assertThat(context).hasSingleBean(CachingConnectionFactory.class)); + } + + @ParameterizedTest + @ValueSource(strings = { "standard", "premium" }) + void cacheEnabledFalseUsesServiceBusConnectionFactory(String pricingTier) { + this.contextRunner + .withPropertyValues( + "spring.jms.servicebus.pricing-tier=" + pricingTier, + "spring.jms.cache.enabled=false" + ) .run(context -> assertThat(context).hasSingleBean(ServiceBusJmsConnectionFactory.class)); } @ParameterizedTest @ValueSource(strings = { "standard", "premium" }) - void enablePoolConnection(String pricingTier) { + void poolEnabledTrueWithCacheNotSetUsesPoolConnectionFactory(String pricingTier) { this.contextRunner .withPropertyValues( "spring.jms.servicebus.pricing-tier=" + pricingTier, @@ -63,49 +74,43 @@ void enablePoolConnection(String pricingTier) { } @ParameterizedTest - @ValueSource(strings = { "org.messaginghub.pooled.jms.JmsPoolConnectionFactory", "org.apache.commons.pool2.PooledObject" }) - void fallbackDefaultJmsConnectionDueNoPoolClasses(String poolClass) { + @ValueSource(strings = { "standard", "premium" }) + void bothPropertiesTrueUsesCachingConnectionFactory(String pricingTier) { this.contextRunner - .withClassLoader(new FilteredClassLoader(poolClass)) .withPropertyValues( - "spring.jms.servicebus.pricing-tier=premium" + "spring.jms.servicebus.pricing-tier=" + pricingTier, + "spring.jms.servicebus.pool.enabled=true", + "spring.jms.cache.enabled=true" ) - .run(context -> assertThat(context).hasSingleBean(ServiceBusJmsConnectionFactory.class)); + .run(context -> assertThat(context).hasSingleBean(CachingConnectionFactory.class)); } @ParameterizedTest @ValueSource(strings = { "standard", "premium" }) - void useCacheConnection(String pricingTier) { + void poolEnabledTrueWithCacheEnabledFalseUsesPoolConnectionFactory(String pricingTier) { this.contextRunner .withPropertyValues( "spring.jms.servicebus.pricing-tier=" + pricingTier, - "spring.jms.cache.enabled=true" + "spring.jms.servicebus.pool.enabled=true", + "spring.jms.cache.enabled=false" ) - .run(context -> assertThat(context).hasSingleBean(CachingConnectionFactory.class)); + .run(context -> assertThat(context).hasSingleBean(JmsPoolConnectionFactory.class)); } @ParameterizedTest @ValueSource(strings = { "standard", "premium" }) - void fallbackUseDefaultConnectionDueNoPoolAndCachingClasses(String pricingTier) { + void poolEnabledFalseWithDefaultCacheUsesCachingConnectionFactory(String pricingTier) { this.contextRunner - .withClassLoader(new FilteredClassLoader( - "org.apache.commons.pool2.PooledObject", - "org.messaginghub.pooled.jms.JmsPoolConnectionFactory", - "org.springframework.jms.connection.CachingConnectionFactory" - )) .withPropertyValues( - "spring.jms.servicebus.pricing-tier=" + pricingTier + "spring.jms.servicebus.pricing-tier=" + pricingTier, + "spring.jms.servicebus.pool.enabled=false" ) - .run(context -> { - assertThat(context).doesNotHaveBean(JmsPoolConnectionFactory.class); - assertThat(context).doesNotHaveBean(CachingConnectionFactory.class); - assertThat(context).hasSingleBean(ServiceBusJmsConnectionFactory.class); - }); + .run(context -> assertThat(context).hasSingleBean(CachingConnectionFactory.class)); } @ParameterizedTest @ValueSource(strings = { "standard", "premium" }) - void useServiceBusJmsConnection(String pricingTier) { + void bothPropertiesFalseUsesServiceBusConnectionFactory(String pricingTier) { this.contextRunner .withPropertyValues( "spring.jms.servicebus.pricing-tier=" + pricingTier, @@ -127,6 +132,54 @@ void useCacheConnectionViaAdditionConfigurationFile(String pricingTier) { .run(context -> assertThat(context).hasSingleBean(CachingConnectionFactory.class)); } + @ParameterizedTest + @ValueSource(strings = { "org.messaginghub.pooled.jms.JmsPoolConnectionFactory", "org.apache.commons.pool2.PooledObject" }) + void fallbackToServiceBusConnectionFactoryWhenPoolEnabledButClassNotPresent(String poolClass) { + this.contextRunner + .withClassLoader(new FilteredClassLoader(poolClass)) + .withPropertyValues( + "spring.jms.servicebus.pricing-tier=premium", + "spring.jms.servicebus.pool.enabled=true" + ) + .run(context -> { + assertThat(context).doesNotHaveBean(JmsPoolConnectionFactory.class); + assertThat(context).hasSingleBean(ServiceBusJmsConnectionFactory.class); + }); + } + + @Test + void fallbackToServiceBusConnectionFactoryWhenCacheEnabledButClassNotPresent() { + this.contextRunner + .withClassLoader(new FilteredClassLoader("org.springframework.jms.connection.CachingConnectionFactory")) + .withPropertyValues( + "spring.jms.servicebus.pricing-tier=premium", + "spring.jms.cache.enabled=true" + ) + .run(context -> { + assertThat(context).doesNotHaveBean(CachingConnectionFactory.class); + assertThat(context).hasSingleBean(ServiceBusJmsConnectionFactory.class); + }); + } + + @ParameterizedTest + @ValueSource(strings = { "standard", "premium" }) + void fallbackToServiceBusConnectionFactoryWhenNoCachingOrPoolClassesPresent(String pricingTier) { + this.contextRunner + .withClassLoader(new FilteredClassLoader( + "org.apache.commons.pool2.PooledObject", + "org.messaginghub.pooled.jms.JmsPoolConnectionFactory", + "org.springframework.jms.connection.CachingConnectionFactory" + )) + .withPropertyValues( + "spring.jms.servicebus.pricing-tier=" + pricingTier + ) + .run(context -> { + assertThat(context).doesNotHaveBean(JmsPoolConnectionFactory.class); + assertThat(context).doesNotHaveBean(CachingConnectionFactory.class); + assertThat(context).hasSingleBean(ServiceBusJmsConnectionFactory.class); + }); + } + @Test void cachingConnectionFactoryReusesSameProducerForSameDestination() throws Exception { // Create mock objects for JMS components diff --git a/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/servicebus/jms/ServiceBusJmsConnectionStringIT.java b/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/servicebus/jms/ServiceBusJmsConnectionStringIT.java index 2a97d3844206..49b97381ff6c 100644 --- a/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/servicebus/jms/ServiceBusJmsConnectionStringIT.java +++ b/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/servicebus/jms/ServiceBusJmsConnectionStringIT.java @@ -2,7 +2,6 @@ // Licensed under the MIT License. package com.azure.spring.cloud.integration.tests.servicebus.jms; -import com.azure.servicebus.jms.ServiceBusJmsConnectionFactory; import jakarta.jms.ConnectionFactory; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -12,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Import; +import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.test.context.ActiveProfiles; @@ -36,7 +36,7 @@ public ServiceBusJmsConnectionStringIT() { @Test @Timeout(70) void testJmsOperationViaConnStringAndCachingConnection() throws InterruptedException { - Assertions.assertSame(ServiceBusJmsConnectionFactory.class, connectionFactory.getClass()); + Assertions.assertInstanceOf(CachingConnectionFactory.class, connectionFactory); LOGGER.info("ServiceBusJmsConnectionStringIT begin."); this.exchangeMessage(jmsTemplate, CONNECTION_STRING_POOL_API_QUEUE_NAME); LOGGER.info("ServiceBusJmsConnectionStringIT end.");