Skip to content

Commit 37c92a1

Browse files
garyrussellartembilan
authored andcommitted
INT-4482: AMQP: Fix Double ErrorMessage
JIRA: https://jira.spring.io/browse/INT-4482 The outer try/catch sends an `ErrorMessage` for all exceptions; it should only do so for `MessageConversionException`. Integration flow exceptions will have been already handled by `MessageProducerSupport`. Also, populate the raw message header consistently - previously it only was populated for flow exceptions. Although the LEFE contains the raw message, it should be in the `ErrorMessage` header for consistency. **cherry-pick to 5.0.x, 4.3.x** * Polishing - PR Comments # Conflicts: # spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java # spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java
1 parent 30c83b0 commit 37c92a1

File tree

2 files changed

+14
-16
lines changed

2 files changed

+14
-16
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
2525
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
2626
import org.springframework.amqp.support.AmqpHeaders;
27+
import org.springframework.amqp.support.converter.MessageConversionException;
2728
import org.springframework.amqp.support.converter.MessageConverter;
2829
import org.springframework.amqp.support.converter.SimpleMessageConverter;
2930
import org.springframework.core.AttributeAccessor;
@@ -197,14 +198,11 @@ protected class Listener implements ChannelAwareMessageListener, RetryListener {
197198
@SuppressWarnings("unchecked")
198199
@Override
199200
public void onMessage(final Message message, final Channel channel) throws Exception {
201+
boolean retryDisabled = AmqpInboundChannelAdapter.this.retryTemplate == null;
200202
try {
201-
if (AmqpInboundChannelAdapter.this.retryTemplate == null) {
202-
try {
203-
processMessage(message, channel);
204-
}
205-
finally {
206-
attributesHolder.remove();
207-
}
203+
204+
if (retryDisabled) {
205+
processMessage(message, channel);
208206
}
209207
else {
210208
AmqpInboundChannelAdapter.this.retryTemplate.execute(new RetryCallback<Object, RuntimeException>() {
@@ -218,15 +216,21 @@ public Void doWithRetry(RetryContext context) throws RuntimeException {
218216
}, (RecoveryCallback<Object>) AmqpInboundChannelAdapter.this.recoveryCallback);
219217
}
220218
}
221-
catch (RuntimeException e) {
219+
catch (MessageConversionException e) {
222220
if (getErrorChannel() != null) {
221+
setAttributesIfNecessary(message, null);
223222
getMessagingTemplate().send(getErrorChannel(), buildErrorMessage(null,
224223
new ListenerExecutionFailedException("Message conversion failed", e, message)));
225224
}
226225
else {
227226
throw e;
228227
}
229228
}
229+
finally {
230+
if (retryDisabled) {
231+
attributesHolder.remove();
232+
}
233+
}
230234
}
231235

232236
private void processMessage(Message message, Channel channel) {

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -251,17 +251,11 @@ public void testAdapterConversionError() throws Exception {
251251
adapter.setOutputChannel(outputChannel);
252252
QueueChannel errorChannel = new QueueChannel();
253253
adapter.setErrorChannel(errorChannel);
254-
adapter.setMessageConverter(new MessageConverter() {
255-
256-
@Override
257-
public org.springframework.amqp.core.Message toMessage(Object object, MessageProperties messageProperties)
258-
throws MessageConversionException {
259-
throw new MessageConversionException("intended");
260-
}
254+
adapter.setMessageConverter(new SimpleMessageConverter() {
261255

262256
@Override
263257
public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException {
264-
return null;
258+
throw new MessageConversionException("intended");
265259
}
266260

267261
});

0 commit comments

Comments
 (0)