Skip to content

Commit ee2b6b4

Browse files
authored
Fix MQTT Inbound readyToSubscribeOnStart logic (#8557)
We cannot always reset `readyToSubscribeOnStart` to `false` in the `stop()` since we may be connected to `ClientManager` and have `cleanStart` option, so we `unsubscribe()` in the stop and then when we start there is no way to subscribe back * Reset `readyToSubscribeOnStart` in the `AbstractMqttMessageDrivenChannelAdapter.doStop()` only if we don't unsubscribe for a `cleanSession` reason. * Also reset it in the `connectionLost`, so next `connectComplete` will take care about subscription or `readyToSubscribeOnStart` state change * Use `isActive()` instead of `isRunning()` in the `connectComplete()` since there is a race condition when connection is established but `doStart()` has not returned yet, but already passed `if (this.readyToSubscribeOnStart)` line
1 parent ac577e9 commit ee2b6b4

File tree

3 files changed

+52
-12
lines changed

3 files changed

+52
-12
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,9 @@ protected synchronized void doStop() {
215215
&& this.clientFactory.getConnectionOptions().isCleanSession())) {
216216

217217
this.client.unsubscribe(getTopic());
218+
// Have to re-subscribe on next start if connection is not lost.
219+
this.readyToSubscribeOnStart = true;
220+
218221
}
219222
}
220223
catch (MqttException ex1) {
@@ -341,6 +344,10 @@ public synchronized void connectionLost(Throwable cause) {
341344
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
342345
}
343346
}
347+
else {
348+
// The 'connectComplete()' re-subscribes or sets this flag otherwise.
349+
this.readyToSubscribeOnStart = false;
350+
}
344351
}
345352

346353
@Override
@@ -404,7 +411,9 @@ public void connectComplete(boolean isReconnect) {
404411

405412
@Override
406413
public void connectComplete(boolean reconnect, String serverURI) {
407-
if (isRunning()) {
414+
// The 'running' flag is set after 'doStart()', so possible a race condition
415+
// when start is not finished yet, but server answers with successful connection.
416+
if (isActive()) {
408417
subscribe();
409418
}
410419
else {

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,17 @@ private synchronized void connect() throws MqttException {
223223

224224
@Override
225225
protected void doStop() {
226-
this.readyToSubscribeOnStart = false;
227226
this.topicLock.lock();
227+
this.readyToSubscribeOnStart = false;
228228
String[] topics = getTopic();
229229
try {
230230
if (this.mqttClient != null && this.mqttClient.isConnected()) {
231231
if (this.connectionOptions.isCleanStart()) {
232232
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
233-
}
233+
// Have to re-subscribe on next start if connection is not lost.
234+
this.readyToSubscribeOnStart = true;
234235

236+
}
235237
if (getClientManager() == null) {
236238
this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout());
237239
}
@@ -331,10 +333,16 @@ public void messageArrived(String topic, MqttMessage mqttMessage) {
331333

332334
@Override
333335
public void disconnected(MqttDisconnectResponse disconnectResponse) {
334-
MqttException cause = disconnectResponse.getException();
335-
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
336-
if (applicationEventPublisher != null) {
337-
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
336+
if (isRunning()) {
337+
MqttException cause = disconnectResponse.getException();
338+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
339+
if (applicationEventPublisher != null) {
340+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
341+
}
342+
}
343+
else {
344+
// The 'connectComplete()' re-subscribes or sets this flag otherwise.
345+
this.readyToSubscribeOnStart = false;
338346
}
339347
}
340348

@@ -358,7 +366,9 @@ public void connectComplete(boolean isReconnect) {
358366

359367
@Override
360368
public void connectComplete(boolean reconnect, String serverURI) {
361-
if (isRunning()) {
369+
// The 'running' flag is set after 'doStart()', so possible a race condition
370+
// when start is not finished yet, but server answers with successful connection.
371+
if (isActive()) {
362372
subscribe();
363373
}
364374
else {

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/ResubscribeAfterAutomaticReconnectTests.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,35 @@ public class ResubscribeAfterAutomaticReconnectTests implements MosquittoContain
6060
@Autowired
6161
private MqttConnectionOptions connectionOptions;
6262

63+
@Autowired
64+
Mqttv5PahoMessageDrivenChannelAdapter pahoMessageDrivenChannelAdapter;
65+
6366
@Autowired
6467
Config config;
6568

6669
@Test
6770
void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedException {
6871
GenericMessage<String> testMessage = new GenericMessage<>("test");
72+
73+
assertThat(this.config.subscribeFirstLatch.await(10, TimeUnit.SECONDS)).isTrue();
74+
6975
this.mqttOutFlowInput.send(testMessage);
7076
assertThat(this.fromMqttChannel.receive(10_000)).isNotNull();
7177

7278
MOSQUITTO_CONTAINER.stop();
7379
MOSQUITTO_CONTAINER.start();
7480
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
7581

76-
assertThat(this.config.subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
82+
assertThat(this.config.subscribeSecondLatch.await(10, TimeUnit.SECONDS)).isTrue();
83+
84+
this.mqttOutFlowInput.send(testMessage);
85+
assertThat(this.fromMqttChannel.receive(10_000)).isNotNull();
86+
87+
// Re-subscription on channel adapter restart with cleanStart
88+
this.pahoMessageDrivenChannelAdapter.stop();
89+
this.pahoMessageDrivenChannelAdapter.start();
90+
91+
assertThat(this.config.subscribeThirdLatch.await(10, TimeUnit.SECONDS)).isTrue();
7792

7893
this.mqttOutFlowInput.send(testMessage);
7994
assertThat(this.fromMqttChannel.receive(10_000)).isNotNull();
@@ -83,13 +98,18 @@ void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedExce
8398
@EnableIntegration
8499
public static class Config {
85100

86-
CountDownLatch subscribeLatch = new CountDownLatch(2);
101+
CountDownLatch subscribeFirstLatch = new CountDownLatch(1);
102+
103+
CountDownLatch subscribeSecondLatch = new CountDownLatch(2);
104+
105+
CountDownLatch subscribeThirdLatch = new CountDownLatch(3);
87106

88107
@Bean
89108
public MqttConnectionOptions mqttConnectOptions() {
90109
return new MqttConnectionOptionsBuilder()
91110
.serverURI(MosquittoContainerTest.mqttUrl())
92111
.automaticReconnect(true)
112+
.cleanStart(true)
93113
.build();
94114
}
95115

@@ -105,15 +125,16 @@ public IntegrationFlow mqttOutFlow(MqttConnectionOptions mqttConnectOptions) {
105125
public IntegrationFlow mqttInFlow(MqttConnectionOptions mqttConnectOptions) {
106126
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
107127
new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectOptions, "mqttInClient", "siTest");
108-
109128
return IntegrationFlow.from(messageProducer)
110129
.channel(c -> c.queue("fromMqttChannel"))
111130
.get();
112131
}
113132

114133
@EventListener(MqttSubscribedEvent.class)
115134
public void mqttEvents() {
116-
this.subscribeLatch.countDown();
135+
this.subscribeFirstLatch.countDown();
136+
this.subscribeSecondLatch.countDown();
137+
this.subscribeThirdLatch.countDown();
117138
}
118139

119140
}

0 commit comments

Comments
 (0)