Skip to content

Revise MQTT Message Handlers logic around recconnection #10697

@artembilan

Description

@artembilan

The current logic is like this:

	@Override
	protected void publish(String topic, Object mqttMessage, Message<?> message) {
		Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'");
		long completionTimeout = getCompletionTimeout();
		try {
			if (!this.mqttClient.isConnected()) {
				this.lock.lock();
				try {
					if (!this.mqttClient.isConnected()) {
						this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
					}
				}
				finally {
					this.lock.unlock();
				}
			}

Even if we double check this.mqttClient.isConnected(), the this.mqttClient.connect() still may lead to the race condition with other mqttClient consumers and fail with:

Caused by: org.eclipse.paho.mqttv5.common.MqttException: Connect already in progress	
	at org.eclipse.paho.mqttv5.client.MqttAsyncClient.connect(MqttAsyncClient.java:734)	
	at org.eclipse.paho.mqttv5.client.MqttAsyncClient.connect(MqttAsyncClient.java:715)	
	at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:249)

That does not mean that we are good to publish already, but at least means that we can check for this.mqttClient.isConnected() is a while loop in the catch block.
According to the Paho library:

		if (comms.isConnected()) {
			throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_CLIENT_CONNECTED);
		}
		if (comms.isConnecting()) {
			throw new MqttException(MqttClientException.REASON_CODE_CONNECT_IN_PROGRESS);
		}

It looks like we need to check for both errors in the catch block.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions