Skip to content

Commit 18aff78

Browse files
artembilantzolov
authored andcommitted
Fix JMS Inbound Endpoints for observation
The `JmsMessageDrivenEndpoint` delegates all the hard work to the `ChannelPublishingJmsMessageListener`, but missed to propagate an `ObservationRegistry` and other related options. The `JmsInboundGateway` is worse: it delegated to the `JmsMessageDrivenEndpoint` * Add `IntegrationObservation.HANDLER` observation to the `MessagingGatewaySupport.send()` operation: used by the delegate in the `ChannelPublishingJmsMessageListener` * Expose and propagate observation-related options from `JmsInboundGateway` and `JmsMessageDrivenEndpoint` * Expose `observationConvention()` option on the `MessagingGatewaySpec` and `MessageProducerSpec` * Remove unused imports * Do not start a new `RECEIVER` observation if there is already `SERVER` one * Fix `MessagingGatewaySupport` for `Observation.NOOP` check. The parent process may still use `ObservationRegistry.NOOP` which sets `Observation.NOOP` instance into the current context and thread local. * fix merge conflicts **Cherry-pick to `6.1.x` & `6.0.x`**
1 parent 0b7c769 commit 18aff78

File tree

8 files changed

+209
-39
lines changed

8 files changed

+209
-39
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,7 @@ project('spring-integration-jms') {
743743
testImplementation "org.apache.activemq:artemis-jakarta-client:$artemisVersion"
744744
testImplementation 'org.springframework:spring-oxm'
745745
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
746+
testImplementation 'io.micrometer:micrometer-observation-test'
746747
}
747748
}
748749

spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageProducerSpec.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import org.springframework.integration.endpoint.MessageProducerSupport;
2020
import org.springframework.integration.support.ErrorMessageStrategy;
21+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
2122
import org.springframework.messaging.MessageChannel;
2223

2324
/**
@@ -151,4 +152,16 @@ public S errorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
151152
return _this();
152153
}
153154

155+
/**
156+
* Provide a custom {@link MessageReceiverObservationConvention}.
157+
* @param observationConvention the observation convention to use.
158+
* @return the spec.
159+
* @since 6.0.8
160+
* @see MessageProducerSupport#setObservationConvention(MessageReceiverObservationConvention)
161+
*/
162+
public S observationConvention(MessageReceiverObservationConvention observationConvention) {
163+
this.target.setObservationConvention(observationConvention);
164+
return _this();
165+
}
166+
154167
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/MessagingGatewaySpec.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import org.springframework.integration.gateway.MessagingGatewaySupport;
2020
import org.springframework.integration.mapping.InboundMessageMapper;
2121
import org.springframework.integration.mapping.OutboundMessageMapper;
22+
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
2223
import org.springframework.messaging.MessageChannel;
2324

2425
/**
@@ -204,4 +205,16 @@ public S shouldTrack(boolean shouldTrack) {
204205
return _this();
205206
}
206207

208+
/**
209+
* Provide a custom {@link MessageRequestReplyReceiverObservationConvention}.
210+
* @param observationConvention the observation convention to use.
211+
* @return the spec.
212+
* @since 6.0.8
213+
* @see MessagingGatewaySupport#setObservationConvention(MessageRequestReplyReceiverObservationConvention)
214+
*/
215+
public S observationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) {
216+
this.target.setObservationConvention(observationConvention);
217+
return _this();
218+
}
219+
207220
}

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Set;
2121
import java.util.concurrent.ConcurrentHashMap;
2222

23+
import io.micrometer.observation.Observation;
2324
import io.micrometer.observation.ObservationRegistry;
2425
import org.reactivestreams.Publisher;
2526
import org.reactivestreams.Subscriber;
@@ -54,12 +55,16 @@
5455
import org.springframework.integration.support.converter.SimpleMessageConverter;
5556
import org.springframework.integration.support.management.IntegrationInboundManagement;
5657
import org.springframework.integration.support.management.IntegrationManagedResource;
58+
import org.springframework.integration.support.management.TrackableComponent;
5759
import org.springframework.integration.support.management.metrics.MeterFacade;
5860
import org.springframework.integration.support.management.metrics.MetricsCaptor;
5961
import org.springframework.integration.support.management.metrics.SampleFacade;
6062
import org.springframework.integration.support.management.metrics.TimerFacade;
63+
import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention;
6164
import org.springframework.integration.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention;
6265
import org.springframework.integration.support.management.observation.IntegrationObservation;
66+
import org.springframework.integration.support.management.observation.MessageReceiverContext;
67+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
6368
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverContext;
6469
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
6570
import org.springframework.lang.Nullable;
@@ -88,7 +93,7 @@
8893
*/
8994
@IntegrationManagedResource
9095
public abstract class MessagingGatewaySupport extends AbstractEndpoint
91-
implements org.springframework.integration.support.management.TrackableComponent,
96+
implements TrackableComponent,
9297
IntegrationInboundManagement, IntegrationPattern {
9398

9499
private static final long DEFAULT_TIMEOUT = 1000L;
@@ -141,6 +146,8 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint
141146
@Nullable
142147
private MessageRequestReplyReceiverObservationConvention observationConvention;
143148

149+
private MessageReceiverObservationConvention receiverObservationConvention;
150+
144151
private volatile AbstractEndpoint replyMessageCorrelator;
145152

146153
private volatile boolean initialized;
@@ -382,6 +389,10 @@ public void setObservationConvention(
382389
this.observationConvention = observationConvention;
383390
}
384391

392+
public void setReceiverObservationConvention(MessageReceiverObservationConvention receiverObservationConvention) {
393+
this.receiverObservationConvention = receiverObservationConvention;
394+
}
395+
385396
@Override
386397
protected void onInit() {
387398
Assert.state(!(this.requestChannelName != null && this.requestChannel != null),
@@ -459,27 +470,65 @@ protected void send(Object object) {
459470
MessageChannel channel = getRequestChannel();
460471
Assert.state(channel != null,
461472
"send is not supported, because no request channel has been configured");
462-
SampleFacade sample = null;
463-
if (this.metricsCaptor != null) {
464-
sample = this.metricsCaptor.start();
473+
474+
Message<?> requestMessage = this.messagingTemplate.doConvert(object, null, this.historyWritingPostProcessor);
475+
476+
if (!ObservationRegistry.NOOP.equals(this.observationRegistry)
477+
&& (this.observationRegistry.getCurrentObservation() == null
478+
|| Observation.NOOP.equals(this.observationRegistry.getCurrentObservation()))) {
479+
480+
sendWithObservation(channel, requestMessage);
481+
}
482+
else if (this.metricsCaptor != null) {
483+
sendWithMetrics(channel, requestMessage);
465484
}
485+
else {
486+
doSend(channel, requestMessage);
487+
}
488+
}
489+
490+
private void sendWithObservation(MessageChannel channel, Message<?> message) {
466491
try {
467-
this.messagingTemplate.convertAndSend(channel, object, this.historyWritingPostProcessor);
468-
if (sample != null) {
469-
sample.stop(sendTimer());
470-
}
492+
IntegrationObservation.HANDLER.observation(
493+
this.receiverObservationConvention,
494+
DefaultMessageReceiverObservationConvention.INSTANCE,
495+
() -> new MessageReceiverContext(message, getComponentName()),
496+
this.observationRegistry)
497+
.observe(() -> this.messagingTemplate.send(channel, message));
471498
}
472-
catch (Exception e) {
473-
if (sample != null) {
474-
sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
475-
}
476-
MessageChannel errorChan = getErrorChannel();
477-
if (errorChan != null) {
478-
this.messagingTemplate.send(errorChan, new ErrorMessage(e));
479-
}
480-
else {
481-
rethrow(e, "failed to send message");
482-
}
499+
catch (Exception ex) {
500+
sendErrorMessage(ex, message);
501+
}
502+
}
503+
504+
private void sendWithMetrics(MessageChannel channel, Message<?> message) {
505+
SampleFacade sample = this.metricsCaptor.start();
506+
try {
507+
this.messagingTemplate.send(channel, message);
508+
sample.stop(sendTimer());
509+
}
510+
catch (Exception ex) {
511+
sample.stop(buildSendTimer(false, ex.getClass().getSimpleName()));
512+
sendErrorMessage(ex, message);
513+
}
514+
}
515+
516+
private void doSend(MessageChannel channel, Message<?> message) {
517+
try {
518+
this.messagingTemplate.send(channel, message);
519+
}
520+
catch (Exception ex) {
521+
sendErrorMessage(ex, message);
522+
}
523+
}
524+
525+
private void sendErrorMessage(Exception exception, Message<?> failedMessage) {
526+
MessageChannel errorChan = getErrorChannel();
527+
if (errorChan != null) {
528+
this.messagingTemplate.send(errorChan, buildErrorMessage(failedMessage, exception));
529+
}
530+
else {
531+
rethrow(exception, "failed to send message");
483532
}
484533
}
485534

spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.Map;
2020

21+
import io.micrometer.observation.ObservationRegistry;
2122
import jakarta.jms.DeliveryMode;
2223
import jakarta.jms.Destination;
2324
import jakarta.jms.InvalidDestinationException;
@@ -35,13 +36,17 @@
3536
import org.springframework.integration.support.DefaultMessageBuilderFactory;
3637
import org.springframework.integration.support.MessageBuilderFactory;
3738
import org.springframework.integration.support.management.TrackableComponent;
39+
import org.springframework.integration.support.management.metrics.MetricsCaptor;
40+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
41+
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
3842
import org.springframework.integration.support.utils.IntegrationUtils;
3943
import org.springframework.jms.listener.SessionAwareMessageListener;
4044
import org.springframework.jms.support.JmsUtils;
4145
import org.springframework.jms.support.converter.MessageConverter;
4246
import org.springframework.jms.support.converter.SimpleMessageConverter;
4347
import org.springframework.jms.support.destination.DestinationResolver;
4448
import org.springframework.jms.support.destination.DynamicDestinationResolver;
49+
import org.springframework.lang.Nullable;
4550
import org.springframework.messaging.Message;
4651
import org.springframework.messaging.MessageChannel;
4752
import org.springframework.messaging.MessagingException;
@@ -304,6 +309,26 @@ public void setExtractReplyPayload(boolean extractReplyPayload) {
304309
this.extractReplyPayload = extractReplyPayload;
305310
}
306311

312+
public void setMetricsCaptor(MetricsCaptor captor) {
313+
this.gatewayDelegate.registerMetricsCaptor(captor);
314+
}
315+
316+
public void setObservationRegistry(ObservationRegistry observationRegistry) {
317+
this.gatewayDelegate.registerObservationRegistry(observationRegistry);
318+
}
319+
320+
public void setRequestReplyObservationConvention(
321+
@Nullable MessageRequestReplyReceiverObservationConvention observationConvention) {
322+
323+
this.gatewayDelegate.setObservationConvention(observationConvention);
324+
}
325+
326+
public void setReceiverObservationConvention(
327+
@Nullable MessageReceiverObservationConvention observationConvention) {
328+
329+
this.gatewayDelegate.setReceiverObservationConvention(observationConvention);
330+
}
331+
307332
@Override
308333
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
309334
this.beanFactory = beanFactory;

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,14 @@
1616

1717
package org.springframework.integration.jms;
1818

19+
import io.micrometer.observation.ObservationRegistry;
20+
1921
import org.springframework.beans.BeansException;
2022
import org.springframework.context.ApplicationContext;
2123
import org.springframework.integration.context.OrderlyShutdownCapable;
2224
import org.springframework.integration.gateway.MessagingGatewaySupport;
25+
import org.springframework.integration.support.management.metrics.MetricsCaptor;
26+
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
2327
import org.springframework.jms.listener.AbstractMessageListenerContainer;
2428
import org.springframework.messaging.MessageChannel;
2529

@@ -114,28 +118,38 @@ public void setShutdownContainerOnStop(boolean shutdownContainerOnStop) {
114118
this.endpoint.setShutdownContainerOnStop(shutdownContainerOnStop);
115119
}
116120

121+
@Override
122+
public void registerMetricsCaptor(MetricsCaptor metricsCaptorToRegister) {
123+
super.registerMetricsCaptor(metricsCaptorToRegister);
124+
this.endpoint.registerMetricsCaptor(metricsCaptorToRegister);
125+
}
117126

118127
@Override
119-
public String getComponentType() {
120-
return this.endpoint.getComponentType();
128+
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
129+
super.registerObservationRegistry(observationRegistry);
130+
this.endpoint.registerObservationRegistry(observationRegistry);
121131
}
122132

123133
@Override
124-
public void setComponentName(String componentName) {
125-
super.setComponentName(componentName);
126-
this.endpoint.setComponentName(getComponentName());
134+
public void setObservationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) {
135+
super.setObservationConvention(observationConvention);
136+
this.endpoint.getListener().setRequestReplyObservationConvention(observationConvention);
137+
}
138+
139+
@Override
140+
public String getComponentType() {
141+
return this.endpoint.getComponentType();
127142
}
128143

129144
@Override
130145
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
131146
super.setApplicationContext(applicationContext);
132147
this.endpoint.setApplicationContext(applicationContext);
133-
this.endpoint.setBeanFactory(applicationContext);
134-
this.endpoint.getListener().setBeanFactory(applicationContext);
135148
}
136149

137150
@Override
138151
protected void onInit() {
152+
this.endpoint.setComponentName(getComponentName());
139153
this.endpoint.afterPropertiesSet();
140154
}
141155

0 commit comments

Comments
 (0)