Skip to content

Commit 002382e

Browse files
artembilangaryrussell
authored andcommitted
Rely on MProducerSupport.active for Flux (#3423)
* Rely on `MProducerSupport.active` for `Flux` * Fix `MessageProducerSupport` to extract an `active` flag and set it before `isRunning` - the `Flux` subscription relies on the `takeWhile()` where in case of `autoStartup = false` we will never start consume because it is set to `true` already after `doStart()` * Refactor all the `MessageProducerSupport` implementation with similar `active` state to use already one from the super class **Cherry-pick to 5.3.x** * * Remove `MessageProducerSupport.setActive()` to not let to mutate it from the implementations * Set `active` to `false` in the `destroy()` * Clean up and fix typos in the affected `JmsMessageDrivenEndpoint` * * Pull `active` flag down to the `AbstractEndpoint` * Set `active = true` in the `start()` before calling `doStart()` * Do same for `active = false` in the `stop()` * Clean up `AbstractEndpoint` impls to not call `doStart/doStop` for nothing * Refactor endpoints to rely on the `active` state from the `AbstractEndpoint` not their own
1 parent 27b464a commit 002382e

File tree

23 files changed

+1167
-253
lines changed

23 files changed

+1167
-253
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,6 @@
4747
public abstract class AbstractEndpoint extends IntegrationObjectSupport
4848
implements SmartLifecycle, DisposableBean {
4949

50-
private boolean autoStartupSetExplicitly;
51-
52-
private volatile boolean autoStartup = true;
53-
54-
private volatile int phase = 0;
55-
56-
private volatile boolean running;
57-
5850
protected final ReentrantLock lifecycleLock = new ReentrantLock(); // NOSONAR
5951

6052
protected final Condition lifecycleCondition = this.lifecycleLock.newCondition(); // NOSONAR
@@ -63,6 +55,16 @@ public abstract class AbstractEndpoint extends IntegrationObjectSupport
6355

6456
private SmartLifecycleRoleController roleController;
6557

58+
private boolean autoStartup = true;
59+
60+
private boolean autoStartupSetExplicitly;
61+
62+
private int phase = 0;
63+
64+
private volatile boolean running;
65+
66+
private volatile boolean active;
67+
6668
public void setAutoStartup(boolean autoStartup) {
6769
this.autoStartup = autoStartup;
6870
this.autoStartupSetExplicitly = true;
@@ -120,6 +122,7 @@ protected void onInit() {
120122

121123
@Override
122124
public void destroy() {
125+
stop();
123126
if (this.roleController != null) {
124127
this.roleController.removeLifecycle(this);
125128
}
@@ -153,6 +156,7 @@ public final void start() {
153156
this.lifecycleLock.lock();
154157
try {
155158
if (!this.running) {
159+
this.active = true;
156160
doStart();
157161
this.running = true;
158162
if (logger.isInfoEnabled()) {
@@ -170,6 +174,7 @@ public final void stop() {
170174
this.lifecycleLock.lock();
171175
try {
172176
if (this.running) {
177+
this.active = false;
173178
doStop();
174179
this.running = false;
175180
if (logger.isInfoEnabled()) {
@@ -187,6 +192,7 @@ public final void stop(Runnable callback) {
187192
this.lifecycleLock.lock();
188193
try {
189194
if (this.running) {
195+
this.active = false;
190196
doStop(callback);
191197
this.running = false;
192198
if (logger.isInfoEnabled()) {
@@ -211,6 +217,10 @@ protected void doStop(Runnable callback) {
211217
callback.run();
212218
}
213219

220+
public boolean isActive() {
221+
return this.active;
222+
}
223+
214224
/**
215225
* Subclasses must implement this method with the start behavior.
216226
* This method will be invoked while holding the {@link #lifecycleLock}.

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ private Flux<Message<?>> createFluxGenerator() {
368368
triggerContext.lastActualExecutionTime(),
369369
new Date())
370370
)), 1)
371-
.repeat(this::isRunning)
371+
.repeat(this::isActive)
372372
.doOnSubscribe(subs -> this.subscription = subs);
373373
}
374374

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
222222
.map(this::trackMessageIfAny)
223223
.doOnComplete(this::stop)
224224
.doOnCancel(this::stop)
225-
.takeWhile((message) -> isRunning());
225+
.takeWhile((message) -> isActive());
226226

227227
if (channelForSubscription instanceof ReactiveStreamsSubscribableChannel) {
228228
((ReactiveStreamsSubscribableChannel) channelForSubscription).subscribeTo(messageFlux);

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,6 +1016,12 @@ private <T> T convert(Object source, Class<T> expectedReturnType) {
10161016
}
10171017
}
10181018

1019+
@Override
1020+
public void destroy() {
1021+
super.destroy();
1022+
this.gatewayMap.values().forEach(MethodInvocationGateway::destroy);
1023+
}
1024+
10191025
private static final class MethodInvocationGateway extends MessagingGatewaySupport {
10201026

10211027
private Expression receiveTimeoutExpression;

spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package org.springframework.integration.endpoint;
1818

19-
import static org.assertj.core.api.Assertions.assertThat;
19+
import java.time.Duration;
2020

2121
import org.junit.jupiter.api.Test;
2222

@@ -50,17 +50,18 @@ public class ReactiveMessageProducerTests {
5050

5151
@Test
5252
public void test() {
53-
assertThat(this.producer.isRunning()).isTrue();
54-
55-
StepVerifier.create(
56-
Flux.from(this.fluxMessageChannel)
57-
.map(Message::getPayload)
58-
.cast(String.class))
59-
.expectNext("test1", "test2")
60-
.thenCancel()
61-
.verify();
62-
63-
assertThat(this.producer.isRunning()).isFalse();
53+
StepVerifier stepVerifier =
54+
StepVerifier.create(
55+
Flux.from(this.fluxMessageChannel)
56+
.map(Message::getPayload)
57+
.cast(String.class))
58+
.expectNext("test1", "test2")
59+
.thenCancel()
60+
.verifyLater();
61+
62+
this.producer.start();
63+
64+
stepVerifier.verify(Duration.ofSeconds(10));
6465
}
6566

6667
@Configuration
@@ -79,10 +80,12 @@ public MessageProducerSupport producer() {
7980

8081
@Override
8182
protected void doStart() {
83+
super.doStart();
8284
subscribeToPublisher(Flux.just("test1", "test2").map(GenericMessage::new));
8385
}
8486

8587
};
88+
producer.setAutoStartup(false);
8689
producer.setOutputChannel(fluxMessageChannel());
8790
return producer;
8891
}

spring-integration-event/src/main/java/org/springframework/integration/event/inbound/ApplicationEventListeningMessageProducer.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@
4141
* @author Mark Fisher
4242
* @author Artem Bilan
4343
* @author Gary Russell
44+
*
4445
* @see ApplicationEventMulticaster
4546
* @see ExpressionMessageProducerSupport
4647
*/
@@ -51,8 +52,6 @@ public class ApplicationEventListeningMessageProducer extends ExpressionMessageP
5152

5253
private ApplicationEventMulticaster applicationEventMulticaster;
5354

54-
private volatile boolean active;
55-
5655
private volatile long stoppedAt;
5756

5857
public ApplicationEventListeningMessageProducer() {
@@ -66,14 +65,13 @@ public ApplicationEventListeningMessageProducer() {
6665
* In addition, this method re-registers the current instance as a {@link GenericApplicationListener}
6766
* with the {@link ApplicationEventMulticaster} which clears the listener cache. The cache will be
6867
* refreshed on the next appropriate {@link ApplicationEvent}.
69-
*
7068
* @param eventTypes The event types.
7169
* @see ApplicationEventMulticaster#addApplicationListener
7270
* @see #supportsEventType
7371
*/
7472
public final void setEventTypes(Class<?>... eventTypes) {
7573
Assert.notNull(eventTypes, "'eventTypes' must not be null");
76-
Set<ResolvableType> eventSet = new HashSet<ResolvableType>(eventTypes.length);
74+
Set<ResolvableType> eventSet = new HashSet<>(eventTypes.length);
7775
for (Class<?> eventType : eventTypes) {
7876
if (eventType != null) {
7977
eventSet.add(ResolvableType.forClass(eventType));
@@ -94,7 +92,7 @@ public String getComponentType() {
9492
@Override
9593
protected void onInit() {
9694
super.onInit();
97-
this.applicationEventMulticaster = this.getBeanFactory()
95+
this.applicationEventMulticaster = getBeanFactory()
9896
.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME,
9997
ApplicationEventMulticaster.class);
10098
Assert.notNull(this.applicationEventMulticaster,
@@ -104,21 +102,23 @@ protected void onInit() {
104102

105103
@Override
106104
public void onApplicationEvent(ApplicationEvent event) {
107-
if (this.active || ((event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent)
108-
&& this.stoppedRecently())) {
109-
if (event.getSource() instanceof Message<?>) {
110-
this.sendMessage((Message<?>) event.getSource());
105+
if (isActive() || ((event instanceof ContextStoppedEvent || event instanceof ContextClosedEvent)
106+
&& stoppedRecently())) {
107+
108+
Object source = event.getSource();
109+
if (source instanceof Message<?>) {
110+
sendMessage((Message<?>) source);
111111
}
112112
else {
113-
Message<?> message = null;
113+
Message<?> message;
114114
Object result = extractObjectToSend(event);
115115
if (result instanceof Message) {
116116
message = (Message<?>) result;
117117
}
118118
else {
119-
message = this.getMessageBuilderFactory().withPayload(result).build();
119+
message = getMessageBuilderFactory().withPayload(result).build();
120120
}
121-
this.sendMessage(message);
121+
sendMessage(message);
122122
}
123123
}
124124
}
@@ -175,15 +175,10 @@ public int getOrder() {
175175
return Ordered.LOWEST_PRECEDENCE;
176176
}
177177

178-
@Override
179-
protected void doStart() {
180-
this.active = true;
181-
}
182-
183178
@Override
184179
protected void doStop() {
185180
this.stoppedAt = System.currentTimeMillis();
186-
this.active = false;
181+
super.doStop();
187182
}
188183

189184
}

spring-integration-file/src/main/java/org/springframework/integration/file/tail/ApacheCommonsFileTailingMessageProducer.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,19 +23,21 @@
2323
* File tailer that delegates to the Apache Commons Tailer.
2424
*
2525
* @author Gary Russell
26+
* @author Artem Bilan
27+
*
2628
* @since 3.0
2729
*
2830
*/
2931
public class ApacheCommonsFileTailingMessageProducer extends FileTailingMessageProducerSupport
3032
implements TailerListener {
3133

32-
private volatile Tailer tailer;
34+
private long pollingDelay = 1000;
3335

34-
private volatile long pollingDelay = 1000;
36+
private boolean end = true;
3537

36-
private volatile boolean end = true;
38+
private boolean reopen = false;
3739

38-
private volatile boolean reopen = false;
40+
private volatile Tailer tailer;
3941

4042
/**
4143
* The delay between checks of the file for new content in milliseconds.
@@ -71,8 +73,8 @@ public String getComponentType() {
7173
@Override
7274
protected void doStart() {
7375
super.doStart();
74-
Tailer theTailer = new Tailer(this.getFile(), this, this.pollingDelay, this.end, this.reopen);
75-
this.getTaskExecutor().execute(theTailer);
76+
Tailer theTailer = new Tailer(getFile(), this, this.pollingDelay, this.end, this.reopen);
77+
getTaskExecutor().execute(theTailer);
7678
this.tailer = theTailer;
7779
}
7880

@@ -88,9 +90,9 @@ public void init(Tailer tailer) {
8890

8991
@Override
9092
public void fileNotFound() {
91-
this.publish("File not found: " + this.getFile().getAbsolutePath());
93+
publish("File not found: " + getFile().getAbsolutePath());
9294
try {
93-
Thread.sleep(this.getMissingFileDelay());
95+
Thread.sleep(getMissingFileDelay());
9496
}
9597
catch (InterruptedException e) {
9698
Thread.currentThread().interrupt();
@@ -99,7 +101,7 @@ public void fileNotFound() {
99101

100102
@Override
101103
public void fileRotated() {
102-
this.publish("File rotated: " + this.getFile().getAbsolutePath());
104+
publish("File rotated: " + getFile().getAbsolutePath());
103105
}
104106

105107
@Override
@@ -109,7 +111,7 @@ public void handle(String line) {
109111

110112
@Override
111113
public void handle(Exception ex) {
112-
this.publish(ex.getMessage());
114+
publish(ex.getMessage());
113115
}
114116

115117
}

spring-integration-file/src/main/java/org/springframework/integration/file/tail/FileTailingMessageProducerSupport.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ protected void publish(String message) {
145145

146146
@Override
147147
protected void doStart() {
148-
super.doStart();
149148
if (this.idleEventInterval > 0) {
150149
this.idleEventScheduledFuture = getTaskScheduler().scheduleWithFixedDelay(() -> {
151150
long now = System.currentTimeMillis();
@@ -162,7 +161,6 @@ protected void doStart() {
162161

163162
@Override
164163
protected void doStop() {
165-
super.doStop();
166164
if (this.idleEventScheduledFuture != null) {
167165
this.idleEventScheduledFuture.cancel(true);
168166
}

0 commit comments

Comments
 (0)