From cef9a4885b81ed1cd58c43ded67a8beaf2a2907b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 5 Sep 2025 10:41:31 +0200 Subject: [PATCH 1/5] Dispatch connection shutdown in appropriate threads For Netty. Make sure that connection shutdown sequence is not executed in the IO event loop if recovery follows. Recovery kicks in the shutdown sequence, so we could end up with a deadlock if the new connection is allocated to the same event loop. References #1663 --- pom.xml | 1 + .../rabbitmq/client/ConnectionFactory.java | 8 +- .../client/impl/AbstractMetricsCollector.java | 1 + .../com/rabbitmq/client/impl/Environment.java | 60 ++++++------ .../client/impl/NettyFrameHandlerFactory.java | 76 +++++++++++++-- .../client/AmqpClientTestExtension.java | 1 - .../test/IoDeadlockOnConnectionClosing.java | 94 +++++++++++++++++++ .../com/rabbitmq/client/test/TestUtils.java | 2 +- src/test/resources/logback-test.xml | 2 +- 9 files changed, 206 insertions(+), 39 deletions(-) create mode 100644 src/test/java/com/rabbitmq/client/test/IoDeadlockOnConnectionClosing.java diff --git a/pom.xml b/pom.xml index bdc40b4c9..f77dd4ba0 100644 --- a/pom.xml +++ b/pom.xml @@ -776,6 +776,7 @@ src/main/java/com/rabbitmq/client/ConnectionFactory.java src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java + src/main/java/com/rabbitmq/client/impl/Environment.java src/main/java/com/rabbitmq/client/observation/**/*.java src/test/java/com/rabbitmq/client/test/functional/MicrometerObservationCollectorMetrics.java src/test/java/com/rabbitmq/client/test/NettyTest.java diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index ad587697f..4dc19ba58 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -1063,6 +1063,10 @@ public ConnectionFactory setCredentialsRefreshService( protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException { if (netty) { if (this.frameHandlerFactory == null) { + Predicate recoveryCondition = + this.connectionRecoveryTriggeringCondition == null + ? AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION + : this.connectionRecoveryTriggeringCondition; this.frameHandlerFactory = new NettyFrameHandlerFactory( this.nettyConf.eventLoopGroup, @@ -1072,7 +1076,9 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO this.nettyConf.enqueuingTimeout, connectionTimeout, socketConf, - maxInboundMessageBodySize); + maxInboundMessageBodySize, + this.automaticRecovery, + recoveryCondition); } return this.frameHandlerFactory; } else { diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index 8f7c9b332..185072b8f 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -442,6 +442,7 @@ private ChannelState(Channel channel) { * * @deprecated Use {@link #markRejectedMessage(boolean)} instead */ + @Deprecated protected abstract void markRejectedMessage(); /** diff --git a/src/main/java/com/rabbitmq/client/impl/Environment.java b/src/main/java/com/rabbitmq/client/impl/Environment.java index 4475ae2eb..7ff0be958 100644 --- a/src/main/java/com/rabbitmq/client/impl/Environment.java +++ b/src/main/java/com/rabbitmq/client/impl/Environment.java @@ -1,4 +1,5 @@ -// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -12,40 +13,45 @@ // // If you have any questions regarding licensing, please contact us at // info@rabbitmq.com. - package com.rabbitmq.client.impl; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** - * Infers information about the execution environment, e.g. - * security permissions. - * Package-protected API. + * Infers information about the execution environment, e.g. security permissions. Package-protected + * API. */ public class Environment { - /** - * This method is deprecated and subject to removal in the next major release. - * - * There is no replacement for this method, as it used to use the - * {@link SecurityManager}, which is itself deprecated and subject to removal. - * @deprecated - * @return always returns true - */ - @Deprecated - public static boolean isAllowedToModifyThreads() { - return true; - } + /** + * This method is deprecated and subject to removal in the next major release. + * + *

There is no replacement for this method, as it used to use the {@link SecurityManager}, + * which is itself deprecated and subject to removal. + * + * @deprecated + * @return always returns true + */ + @Deprecated + public static boolean isAllowedToModifyThreads() { + return true; + } + + static Thread newThread(Runnable runnable, String name) { + return newThread(Executors.defaultThreadFactory(), runnable, name); + } - public static Thread newThread(ThreadFactory factory, Runnable runnable, String name) { - Thread t = factory.newThread(runnable); - t.setName(name); - return t; - } + public static Thread newThread(ThreadFactory factory, Runnable runnable, String name) { + Thread t = factory.newThread(runnable); + t.setName(name); + return t; + } - public static Thread newThread(ThreadFactory factory, Runnable runnable, String name, boolean isDaemon) { - Thread t = newThread(factory, runnable, name); - t.setDaemon(isDaemon); - return t; - } + public static Thread newThread( + ThreadFactory factory, Runnable runnable, String name, boolean isDaemon) { + Thread t = newThread(factory, runnable, name); + t.setDaemon(isDaemon); + return t; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java index 263a0c01c..00d185365 100644 --- a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java @@ -23,6 +23,7 @@ import com.rabbitmq.client.Address; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MalformedFrameException; +import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.SocketConfigurator; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; @@ -61,6 +62,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import javax.net.ssl.SSLHandshakeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +75,7 @@ public final class NettyFrameHandlerFactory extends AbstractFrameHandlerFactory private final Consumer channelCustomizer; private final Consumer bootstrapCustomizer; private final Duration enqueuingTimeout; + private final Predicate willRecover; public NettyFrameHandlerFactory( EventLoopGroup eventLoopGroup, @@ -82,7 +85,9 @@ public NettyFrameHandlerFactory( Duration enqueuingTimeout, int connectionTimeout, SocketConfigurator configurator, - int maxInboundMessageBodySize) { + int maxInboundMessageBodySize, + boolean automaticRecovery, + Predicate recoveryCondition) { super(connectionTimeout, configurator, sslContextFactory != null, maxInboundMessageBodySize); this.eventLoopGroup = eventLoopGroup; this.sslContextFactory = sslContextFactory == null ? connName -> null : sslContextFactory; @@ -90,6 +95,20 @@ public NettyFrameHandlerFactory( this.bootstrapCustomizer = bootstrapCustomizer == null ? Utils.noOpConsumer() : bootstrapCustomizer; this.enqueuingTimeout = enqueuingTimeout; + this.willRecover = + sse -> { + if (!automaticRecovery) { + return false; + } else { + try { + return recoveryCondition.test(sse); + } catch (Exception e) { + // we assume it will recover, so we take the safe path to dispatch the closing + // it avoids the risk of deadlock + return true; + } + } + }; } private static void closeNettyState(Channel channel, EventLoopGroup eventLoopGroup) { @@ -133,6 +152,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti sslContext, this.eventLoopGroup, this.enqueuingTimeout, + this.willRecover, this.channelCustomizer, this.bootstrapCustomizer); } @@ -163,6 +183,7 @@ private NettyFrameHandler( SslContext sslContext, EventLoopGroup elg, Duration enqueuingTimeout, + Predicate willRecover, Consumer channelCustomizer, Consumer bootstrapCustomizer) throws IOException { @@ -195,7 +216,8 @@ private NettyFrameHandler( int lengthFieldOffset = 3; int lengthFieldLength = 4; int lengthAdjustement = 1; - AmqpHandler amqpHandler = new AmqpHandler(maxInboundMessageBodySize, this::close); + AmqpHandler amqpHandler = + new AmqpHandler(maxInboundMessageBodySize, this::close, willRecover); int port = ConnectionFactory.portOrDefault(addr.getPort(), sslContext != null); b.handler( new ChannelInitializer() { @@ -404,14 +426,26 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter { private final int maxPayloadSize; private final Runnable closeSequence; + private final Predicate willRecover; private volatile AMQConnection connection; + private volatile Channel ch; private final AtomicBoolean writable = new AtomicBoolean(true); private final AtomicReference writableLatch = new AtomicReference<>(new CountDownLatch(1)); - private AmqpHandler(int maxPayloadSize, Runnable closeSequence) { + private AmqpHandler( + int maxPayloadSize, + Runnable closeSequence, + Predicate willRecover) { this.maxPayloadSize = maxPayloadSize; this.closeSequence = closeSequence; + this.willRecover = willRecover; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + this.ch = ctx.channel(); + super.channelActive(ctx); } @Override @@ -444,7 +478,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (noProblem && (!this.connection.isRunning() || this.connection.hasBrokerInitiatedShutdown())) { // looks like the frame was Close-Ok or Close - ctx.executor().submit(() -> this.connection.doFinalShutdown()); + this.dispatchShutdownToConnection(() -> this.connection.doFinalShutdown()); } } finally { m.release(); @@ -504,10 +538,10 @@ public void channelInactive(ChannelHandlerContext ctx) { AMQConnection c = this.connection; if (c.isOpen()) { // it is likely to be an IO exception - c.handleIoError(null); + this.dispatchShutdownToConnection(() -> c.handleIoError(null)); } else { // just in case, the call is idempotent anyway - c.doFinalShutdown(); + this.dispatchShutdownToConnection(c::doFinalShutdown); } } } @@ -533,7 +567,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc this.connection.getAddress().getHostName(), this.connection.getPort()); if (needToDispatchIoError()) { - this.connection.handleHeartbeatFailure(); + this.dispatchShutdownToConnection(() -> this.connection.handleHeartbeatFailure()); } } else if (e.state() == IdleState.WRITER_IDLE) { this.connection.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0)); @@ -545,7 +579,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc private void handleIoError(Throwable cause) { if (needToDispatchIoError()) { - this.connection.handleIoError(cause); + this.dispatchShutdownToConnection(() -> this.connection.handleIoError(cause)); } else { this.closeSequence.run(); } @@ -563,6 +597,32 @@ private boolean isWritable() { private CountDownLatch writableLatch() { return this.writableLatch.get(); } + + protected void dispatchShutdownToConnection(Runnable connectionShutdownRunnable) { + String name = "rabbitmq-connection-shutdown"; + AMQConnection c = this.connection; + if (c == null || ch == null) { + // not enough information, we dispatch in separate thread + Environment.newThread(connectionShutdownRunnable, name).start(); + } else { + if (ch.eventLoop().inEventLoop()) { + if (this.willRecover.test(c.getCloseReason())) { + // the connection will recover, we don't want this to happen in the event loop, + // it could cause a deadlock, so using a separate thread + name = name + "-" + c; + System.out.println("in separate thread"); + Environment.newThread(connectionShutdownRunnable, name).start(); + } else { + // no recovery, it is safe to dispatch in the event loop + System.out.println("in event loop"); + ch.eventLoop().submit(connectionShutdownRunnable); + } + } else { + // not in the event loop, we can run it in the same thread + connectionShutdownRunnable.run(); + } + } + } } private static final class ProtocolVersionMismatchHandler extends ChannelInboundHandlerAdapter { diff --git a/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java b/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java index 78242e9d5..93919f367 100644 --- a/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java +++ b/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java @@ -144,7 +144,6 @@ public void afterAll(ExtensionContext context) { try { eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS); } catch (InterruptedException e) { - LOGGER.debug("Error while asynchronously closing Netty event loop group", e); Thread.currentThread().interrupt(); } catch (Exception e) { LOGGER.warn("Error while asynchronously closing Netty event loop group", e); diff --git a/src/test/java/com/rabbitmq/client/test/IoDeadlockOnConnectionClosing.java b/src/test/java/com/rabbitmq/client/test/IoDeadlockOnConnectionClosing.java new file mode 100644 index 000000000..6eb5a10e9 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/IoDeadlockOnConnectionClosing.java @@ -0,0 +1,94 @@ +// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.IoHandlerFactory; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static com.rabbitmq.client.test.TestUtils.IO_NETTY; +import static com.rabbitmq.client.test.TestUtils.IO_SOCKET; +import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * + */ +public class IoDeadlockOnConnectionClosing { + + static final Logger LOGGER = LoggerFactory.getLogger(IoDeadlockOnConnectionClosing.class); + + EventLoopGroup eventLoopGroup; + ConnectionFactory cf; + List connections; + + @ParameterizedTest + @ValueSource(strings = {IO_NETTY, IO_SOCKET}) + public void connectionClosing(String io) throws Exception { + init(io); + try { + for (int i = 0; i < 10; i++) { + connections.add(cf.newConnection()); + } + closeAllConnectionsAndWaitForRecovery(connections); + for (Connection connection : connections) { + assertTrue(connection.isOpen()); + } + } finally { + tearDown(io); + } + } + + private void init(String io) { + connections = new ArrayList<>(); + cf = TestUtils.connectionFactory(); + if (IO_NETTY.equals(io)) { + IoHandlerFactory ioHandlerFactory = NioIoHandler.newFactory(); + this.eventLoopGroup = new MultiThreadIoEventLoopGroup(2, ioHandlerFactory); + cf.netty().eventLoopGroup(eventLoopGroup); + } else if (IO_SOCKET.equals(io)) { + cf.useBlockingIo(); + } else { + throw new IllegalArgumentException("Unknow IO layer: " + io); + } + } + + private void tearDown(String io) { + for (Connection connection : connections) { + try { + connection.close(2000); + } catch (Exception e) { + LOGGER.warn("Error while closing test connection", e); + } + } + if (IO_NETTY.equals(io)) { + this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + + } + +} diff --git a/src/test/java/com/rabbitmq/client/test/TestUtils.java b/src/test/java/com/rabbitmq/client/test/TestUtils.java index 69ce57afa..2deacc708 100644 --- a/src/test/java/com/rabbitmq/client/test/TestUtils.java +++ b/src/test/java/com/rabbitmq/client/test/TestUtils.java @@ -50,7 +50,7 @@ public class TestUtils { public static final String IO_LAYER = System.getProperty("io.layer", "netty"); - private static final String IO_SOCKET = "socket"; + public static final String IO_SOCKET = "socket"; public static final String IO_NETTY = "netty"; public static final List IO_LAYERS = Collections.unmodifiableList(Arrays.asList(IO_SOCKET, IO_NETTY)); diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 2a01f7d4e..18ef54dce 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -8,7 +8,7 @@ - + From 760025467d5a98dae48c6b874dfde54c885fc39c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 5 Sep 2025 10:47:44 +0200 Subject: [PATCH 2/5] Remove debugging code --- .../client/impl/NettyFrameHandlerFactory.java | 36 ------------------- 1 file changed, 36 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java index 00d185365..8d52adb77 100644 --- a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java @@ -52,8 +52,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.time.Duration; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -355,7 +353,6 @@ public void writeFrame(Frame frame) throws IOException { if (canWriteNow) { this.doWriteFrame(frame); } else { - this.handler.logEvents(); throw new IOException("Frame enqueuing failed"); } } catch (InterruptedException e) { @@ -485,42 +482,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } - private static class Event { - private final long time; - private final String label; - - public Event(long time, String label) { - this.time = time; - this.label = label; - } - - @Override - public String toString() { - return this.label + " " + this.time; - } - } - - private static final int MAX_EVENTS = 100; - private final Queue events = new ConcurrentLinkedQueue<>(); - - private void logEvents() { - if (this.events.size() > 0) { - long start = this.events.peek().time; - LOGGER.info("channel writability history:"); - events.forEach(e -> LOGGER.info("{}: {}", (e.time - start) / 1_000_000, e.label)); - } - } - @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { boolean canWrite = ctx.channel().isWritable(); - Event event = new Event(System.nanoTime(), Boolean.toString(canWrite)); - if (this.events.size() >= MAX_EVENTS) { - this.events.poll(); - this.events.offer(event); - } - this.events.add(event); - if (this.writable.compareAndSet(!canWrite, canWrite)) { if (canWrite) { CountDownLatch latch = writableLatch.getAndSet(new CountDownLatch(1)); From 5da520fdac1c00f000a402a75de609a3ce87722f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:45:59 +0200 Subject: [PATCH 3/5] Make dispatching idempotent --- .../client/impl/NettyFrameHandlerFactory.java | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java index 8d52adb77..e883aaf67 100644 --- a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java @@ -429,6 +429,7 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter { private final AtomicBoolean writable = new AtomicBoolean(true); private final AtomicReference writableLatch = new AtomicReference<>(new CountDownLatch(1)); + private final AtomicBoolean shutdownDispatched = new AtomicBoolean(false); private AmqpHandler( int maxPayloadSize, @@ -563,27 +564,27 @@ private CountDownLatch writableLatch() { } protected void dispatchShutdownToConnection(Runnable connectionShutdownRunnable) { - String name = "rabbitmq-connection-shutdown"; - AMQConnection c = this.connection; - if (c == null || ch == null) { - // not enough information, we dispatch in separate thread - Environment.newThread(connectionShutdownRunnable, name).start(); - } else { - if (ch.eventLoop().inEventLoop()) { - if (this.willRecover.test(c.getCloseReason())) { - // the connection will recover, we don't want this to happen in the event loop, - // it could cause a deadlock, so using a separate thread - name = name + "-" + c; - System.out.println("in separate thread"); - Environment.newThread(connectionShutdownRunnable, name).start(); + if (this.shutdownDispatched.compareAndSet(false, true)) { + String name = "rabbitmq-connection-shutdown"; + AMQConnection c = this.connection; + if (c == null || ch == null) { + // not enough information, we dispatch in separate thread + Environment.newThread(connectionShutdownRunnable, name).start(); + } else { + if (ch.eventLoop().inEventLoop()) { + if (this.willRecover.test(c.getCloseReason()) || ch.eventLoop().isShuttingDown()) { + // the connection will recover, we don't want this to happen in the event loop, + // it could cause a deadlock, so using a separate thread + name = name + "-" + c; + Environment.newThread(connectionShutdownRunnable, name).start(); + } else { + // no recovery, it is safe to dispatch in the event loop + ch.eventLoop().submit(connectionShutdownRunnable); + } } else { - // no recovery, it is safe to dispatch in the event loop - System.out.println("in event loop"); - ch.eventLoop().submit(connectionShutdownRunnable); + // not in the event loop, we can run it in the same thread + connectionShutdownRunnable.run(); } - } else { - // not in the event loop, we can run it in the same thread - connectionShutdownRunnable.run(); } } } From 6d98b74387c8accf3175f054e3017c542e24e95a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 5 Sep 2025 14:38:59 +0200 Subject: [PATCH 4/5] Use test class name for thread prefix --- pom.xml | 1 + .../client/impl/NettyFrameHandlerFactory.java | 9 ++++-- .../client/AmqpClientTestExtension.java | 31 ++++++++++++++++++- src/test/resources/logback-test.xml | 2 +- 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index f77dd4ba0..2d4497644 100644 --- a/pom.xml +++ b/pom.xml @@ -774,6 +774,7 @@ + src/main/java/com/rabbitmq/AmqpClientTestExtension.java src/main/java/com/rabbitmq/client/ConnectionFactory.java src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java src/main/java/com/rabbitmq/client/impl/Environment.java diff --git a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java index e883aaf67..fe2ac1bdc 100644 --- a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java @@ -57,6 +57,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -430,6 +431,8 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter { private final AtomicReference writableLatch = new AtomicReference<>(new CountDownLatch(1)); private final AtomicBoolean shutdownDispatched = new AtomicBoolean(false); + private static final AtomicInteger SEQUENCE = new AtomicInteger(0); + private final String id; private AmqpHandler( int maxPayloadSize, @@ -438,6 +441,7 @@ private AmqpHandler( this.maxPayloadSize = maxPayloadSize; this.closeSequence = closeSequence; this.willRecover = willRecover; + this.id = "amqp-handler-" + SEQUENCE.getAndIncrement(); } @Override @@ -501,6 +505,7 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio public void channelInactive(ChannelHandlerContext ctx) { if (needToDispatchIoError()) { AMQConnection c = this.connection; + LOGGER.debug("Dispatching shutdown when channel became inactive ({})", this.id); if (c.isOpen()) { // it is likely to be an IO exception this.dispatchShutdownToConnection(() -> c.handleIoError(null)); @@ -565,7 +570,7 @@ private CountDownLatch writableLatch() { protected void dispatchShutdownToConnection(Runnable connectionShutdownRunnable) { if (this.shutdownDispatched.compareAndSet(false, true)) { - String name = "rabbitmq-connection-shutdown"; + String name = "rabbitmq-connection-shutdown-" + this.id; AMQConnection c = this.connection; if (c == null || ch == null) { // not enough information, we dispatch in separate thread @@ -575,7 +580,7 @@ protected void dispatchShutdownToConnection(Runnable connectionShutdownRunnable) if (this.willRecover.test(c.getCloseReason()) || ch.eventLoop().isShuttingDown()) { // the connection will recover, we don't want this to happen in the event loop, // it could cause a deadlock, so using a separate thread - name = name + "-" + c; + // name = name + "-" + c; Environment.newThread(connectionShutdownRunnable, name).start(); } else { // no recovery, it is safe to dispatch in the event loop diff --git a/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java b/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java index 93919f367..e4b69de46 100644 --- a/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java +++ b/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java @@ -31,6 +31,9 @@ import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; @@ -105,7 +108,8 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con @Override public void beforeAll(ExtensionContext context) { if (TestUtils.isNetty()) { - EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + ThreadFactory tf = new NamedThreadFactory(context.getTestClass().get().getSimpleName() + "-"); + EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup(tf, NioIoHandler.newFactory()); store(context) .put("nettyEventLoopGroup", eventLoopGroup); TestUtils.eventLoopGroup(eventLoopGroup); @@ -165,4 +169,29 @@ public void close() { this.executorService.shutdownNow(); } } + + private static class NamedThreadFactory implements ThreadFactory { + + private final ThreadFactory backingThreadFactory; + + private final String prefix; + + private final AtomicLong count = new AtomicLong(0); + + private NamedThreadFactory(String prefix) { + this(Executors.defaultThreadFactory(), prefix); + } + + private NamedThreadFactory(ThreadFactory backingThreadFactory, String prefix) { + this.backingThreadFactory = backingThreadFactory; + this.prefix = prefix; + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = this.backingThreadFactory.newThread(r); + thread.setName(prefix + count.getAndIncrement()); + return thread; + } + } } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 18ef54dce..2a01f7d4e 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -8,7 +8,7 @@ - + From 5da2fc3465da6462d2307893b5112e51433294aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 8 Sep 2025 09:07:25 +0200 Subject: [PATCH 5/5] Check Netty event loop group is open before creating frame handler No need to try to connect if the event loop group was shut down. This also triggers an infinite cycle of connection recovery in the following case: connection disconnected, recovery starts, event loop group closed, new connection attempt, Netty channel created and becomes inactive immediately, recovery restarts, etc. To avoid the recovery loop, stop recovery when the exception (IllegalStateException) is thrown. References #1663 --- .../client/impl/NettyFrameHandlerFactory.java | 12 +++++ .../recovery/AutorecoveringConnection.java | 50 ++++++++++--------- .../RecoveryAwareAMQConnectionFactory.java | 4 +- .../client/AmqpClientTestExtension.java | 23 +++++---- .../rabbitmq/client/test/BrokerTestCase.java | 7 ++- .../rabbitmq/client/test/ClientTestSuite.java | 1 + .../test/functional/ConnectionRecovery.java | 25 +++++----- .../test/functional/ExceptionHandling.java | 30 +++++------ .../client/test/functional/Metrics.java | 24 ++++----- src/test/java/com/rabbitmq/tools/Host.java | 2 +- 10 files changed, 101 insertions(+), 77 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java index fe2ac1bdc..abac9151a 100644 --- a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java @@ -200,6 +200,14 @@ private NettyFrameHandler( } else { this.eventLoopGroup = null; } + + if (b.config().group() == null) { + throw new IllegalStateException("The event loop group is not set"); + } else if (b.config().group().isShuttingDown()) { + LOGGER.warn("The Netty loop group was shut down, it is not possible to connect or recover"); + throw new IllegalStateException("The event loop group was shut down"); + } + if (b.config().channelFactory() == null) { b.channel(NioSocketChannel.class); } @@ -317,6 +325,10 @@ public void sendHeader() { @Override public void initialize(AMQConnection connection) { + LOGGER.debug( + "Setting connection {} to AMQP handler {}", + connection.getClientProvidedName(), + this.handler.id); this.handler.connection = connection; } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 0e3e82d95..cfc2fbce4 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -591,16 +591,16 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException { } LOGGER.debug("Connection {} has recovered", newConn); this.addAutomaticRecoveryListener(newConn); - this.recoverShutdownListeners(newConn); - this.recoverBlockedListeners(newConn); - this.recoverChannels(newConn); - // don't assign new delegate connection until channel recovery is complete - this.delegate = newConn; - if (this.params.isTopologyRecoveryEnabled()) { - notifyTopologyRecoveryListenersStarted(); - recoverTopology(params.getTopologyRecoveryExecutor()); - } - this.notifyRecoveryListenersComplete(); + this.recoverShutdownListeners(newConn); + this.recoverBlockedListeners(newConn); + this.recoverChannels(newConn); + // don't assign new delegate connection until channel recovery is complete + this.delegate = newConn; + if (this.params.isTopologyRecoveryEnabled()) { + notifyTopologyRecoveryListenersStarted(); + recoverTopology(params.getTopologyRecoveryExecutor()); + } + this.notifyRecoveryListenersComplete(); } private void recoverShutdownListeners(final RecoveryAwareAMQConnection newConn) { @@ -624,25 +624,27 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti attempts++; // No Sonar: no need to close this resource because we're the one that creates it // and hands it over to the user - RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR - synchronized(recoveryLock) { - if (!manuallyClosed) { - // This is the standard case. - return newConn; - } - } - // This is the once in a blue moon case. - // Application code just called close as the connection - // was being re-established. So we attempt to close the newly created connection. - newConn.abort(); - return null; + RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR + synchronized(recoveryLock) { + if (!manuallyClosed) { + // This is the standard case. + return newConn; + } + } + // This is the once in a blue moon case. + // Application code just called close as the connection + // was being re-established. So we attempt to close the newly created connection. + newConn.abort(); + return null; + } catch (IllegalStateException e) { + this.getExceptionHandler().handleConnectionRecoveryException(this, e); + return null; } catch (Exception e) { Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(attempts)); this.getExceptionHandler().handleConnectionRecoveryException(this, e); } } - - return null; + return null; } private void recoverChannels(final RecoveryAwareAMQConnection newConn) { diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java index b4754a217..ac4d529bc 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java @@ -71,10 +71,8 @@ public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutExc conn.start(); metricsCollector.newConnection(conn); return conn; - } catch (IOException e) { + } catch (IOException | TimeoutException e) { lastException = e; - } catch (TimeoutException te) { - lastException = te; } } diff --git a/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java b/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java index e4b69de46..3e6793218 100644 --- a/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java +++ b/src/test/java/com/rabbitmq/client/AmqpClientTestExtension.java @@ -143,16 +143,19 @@ public void afterAll(ExtensionContext context) { .getRoot() .getStore(ExtensionContext.Namespace.GLOBAL) .getOrComputeIfAbsent(ExecutorServiceCloseableResourceWrapper.class); - wrapper.executorService.submit( - () -> { - try { - eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOGGER.warn("Error while asynchronously closing Netty event loop group", e); - } - }); + + wrapper + .executorService + .submit( + () -> { + try { + eventLoopGroup.shutdownGracefully(0, 0, SECONDS).get(10, SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER.warn("Error while asynchronously closing Netty event loop group", e); + } + }); } } diff --git a/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java b/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java index 4bb63c02c..26fcffb0d 100644 --- a/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java +++ b/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java @@ -125,7 +125,7 @@ protected void bareRestart() public void openConnection() throws IOException, TimeoutException { if (connection == null) { - connection = connectionFactory.newConnection(UUID.randomUUID().toString()); + connection = connectionFactory.newConnection(generateConnectionName()); } } @@ -327,6 +327,11 @@ protected String generateExchangeName() { this.testInfo.getTestMethod().get().getName()); } + protected String generateConnectionName() { + return name("conn", this.testInfo.getTestClass().get(), + this.testInfo.getTestMethod().get().getName()); + } + private static String name(String prefix, Class testClass, String testMethodName) { String uuid = UUID.randomUUID().toString(); return String.format( diff --git a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java index a48710fe9..2e968b21b 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java @@ -73,6 +73,7 @@ ValueWriterTest.class, BlockedConnectionTest.class, NettyTest.class, + IoDeadlockOnConnectionClosing.class, ProtocolVersionMismatch.class }) public class ClientTestSuite { diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index 5145929eb..573831783 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -57,7 +57,7 @@ public class ConnectionRecovery extends BrokerTestCase { @Test public void namedConnectionRecovery() throws IOException, InterruptedException, TimeoutException { - String connectionName = "custom-name"; + String connectionName = generateConnectionName(); RecoverableConnection c = newRecoveringConnection(connectionName); try { assertThat(c.isOpen()).isTrue(); @@ -151,7 +151,7 @@ public String getPassword() { return password; } }); - RecoverableConnection c = (RecoverableConnection) cf.newConnection(UUID.randomUUID().toString()); + RecoverableConnection c = (RecoverableConnection) cf.newConnection(generateConnectionName()); try { assertThat(c.isOpen()).isTrue(); assertThat(usernameRequested.get()).isEqualTo(1); @@ -787,13 +787,14 @@ public void handleDelivery(String consumerTag, @Test public void recoveryWithExponentialBackoffDelayHandler() throws Exception { ConnectionFactory connectionFactory = TestUtils.connectionFactory(); connectionFactory.setRecoveryDelayHandler(new RecoveryDelayHandler.ExponentialBackoffDelayHandler()); - Connection testConnection = connectionFactory.newConnection(UUID.randomUUID().toString()); + String connName = generateConnectionName(); + Connection testConnection = connectionFactory.newConnection(connName); try { assertThat(testConnection.isOpen()).isTrue(); TestUtils.closeAndWaitForRecovery((RecoverableConnection) testConnection); assertThat(testConnection.isOpen()).isTrue(); } finally { - connection.close(); + testConnection.close(); } } @@ -807,7 +808,7 @@ public void handleDelivery(String consumerTag, connectionFactory.setTopologyRecoveryExecutor(executor); assertThat(connectionFactory.getTopologyRecoveryExecutor()).isEqualTo(executor); RecoverableConnection testConnection = (RecoverableConnection) connectionFactory.newConnection( - UUID.randomUUID().toString() + generateConnectionName() ); try { final List channels = new ArrayList(); @@ -970,26 +971,26 @@ protected ConnectionFactory newConnectionFactory() { return buildConnectionFactoryWithRecoveryEnabled(false); } - private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery) + private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery); - return (AutorecoveringConnection) cf.newConnection(UUID.randomUUID().toString()); + return (AutorecoveringConnection) cf.newConnection(generateConnectionName()); } - private static RecoverableConnection newRecoveringConnection(Address[] addresses) + private RecoverableConnection newRecoveringConnection(Address[] addresses) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(false); // specifically use the Address[] overload - return (AutorecoveringConnection) cf.newConnection(addresses, UUID.randomUUID().toString()); + return (AutorecoveringConnection) cf.newConnection(addresses, generateConnectionName()); } - private static RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List

addresses) + private RecoverableConnection newRecoveringConnection(boolean disableTopologyRecovery, List
addresses) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery); - return (AutorecoveringConnection) cf.newConnection(addresses, UUID.randomUUID().toString()); + return (AutorecoveringConnection) cf.newConnection(addresses, generateConnectionName()); } - private static RecoverableConnection newRecoveringConnection(List
addresses) + private RecoverableConnection newRecoveringConnection(List
addresses) throws IOException, TimeoutException { return newRecoveringConnection(false, addresses); } diff --git a/src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java b/src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java index ef71a9419..fe46bf3f2 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ExceptionHandling.java @@ -40,6 +40,7 @@ public class ExceptionHandling { private ConnectionFactory newConnectionFactory(ExceptionHandler eh) { ConnectionFactory cf = TestUtils.connectionFactory(); + cf.setNetworkRecoveryInterval(2000); cf.setExceptionHandler(eh); return cf; } @@ -74,21 +75,22 @@ protected void testConsumerHandleConsumerException(ExceptionHandler eh, CountDow throws InterruptedException, TimeoutException, IOException { ConnectionFactory cf = newConnectionFactory(eh); assertEquals(cf.getExceptionHandler(), eh); - Connection conn = cf.newConnection(); - assertEquals(conn.getExceptionHandler(), eh); - Channel ch = conn.createChannel(); - String q = ch.queueDeclare().getQueue(); - ch.basicConsume(q, new DefaultConsumer(ch) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, - AMQP.BasicProperties properties, byte[] body) throws IOException { - throw new RuntimeException("exception expected here, don't freak out"); - } - }); - ch.basicPublish("", q, null, "".getBytes()); - wait(latch); + try (Connection conn = cf.newConnection()) { + assertEquals(conn.getExceptionHandler(), eh); + Channel ch = conn.createChannel(); + String q = ch.queueDeclare().getQueue(); + ch.basicConsume(q, new DefaultConsumer(ch) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) throws IOException { + throw new RuntimeException("exception expected here, don't freak out"); + } + }); + ch.basicPublish("", q, null, "".getBytes()); + wait(latch); - assertEquals(!expectChannelClose, ch.isOpen()); + assertEquals(!expectChannelClose, ch.isOpen()); + } } @Test public void nullExceptionHandler() { diff --git a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java index 853138014..a66bcb8f6 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java +++ b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java @@ -82,7 +82,7 @@ public void metrics(ConnectionFactory connectionFactory) throws IOException, Tim Connection connection1 = null; Connection connection2 = null; try { - connection1 = connectionFactory.newConnection(); + connection1 = connectionFactory.newConnection(generateConnectionName()); assertThat(metrics.getConnections().getCount()).isEqualTo(1L); connection1.createChannel(); @@ -102,7 +102,7 @@ public void metrics(ConnectionFactory connectionFactory) throws IOException, Tim channel.basicGet(QUEUE, true); assertThat(metrics.getConsumedMessages().getCount()).isEqualTo(2L); - connection2 = connectionFactory.newConnection(); + connection2 = connectionFactory.newConnection(generateConnectionName()); assertThat(metrics.getConnections().getCount()).isEqualTo(2L); connection2.createChannel(); @@ -142,7 +142,7 @@ public void metricsPublisherUnrouted(ConnectionFactory connectionFactory) throws connectionFactory.setMetricsCollector(metrics); Connection connection = null; try { - connection = connectionFactory.newConnection(); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel = connection.createChannel(); channel.confirmSelect(); assertThat(metrics.getPublishUnroutedMessages().getCount()).isEqualTo(0L); @@ -168,7 +168,7 @@ public void metricsPublisherAck(ConnectionFactory connectionFactory) throws IOEx connectionFactory.setMetricsCollector(metrics); Connection connection = null; try { - connection = connectionFactory.newConnection(); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel = connection.createChannel(); channel.confirmSelect(); assertThat(metrics.getPublishAcknowledgedMessages().getCount()).isEqualTo(0L); @@ -196,7 +196,7 @@ public void metricsAck(ConnectionFactory connectionFactory) throws IOException, Connection connection = null; try { - connection = connectionFactory.newConnection(); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel1 = connection.createChannel(); Channel channel2 = connection.createChannel(); @@ -264,7 +264,7 @@ public void metricsReject(ConnectionFactory connectionFactory) throws IOExceptio Connection connection = null; try { - connection = connectionFactory.newConnection(); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel = connection.createChannel(); sendMessage(channel); @@ -304,7 +304,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF try { Channel [] channels = new Channel[nbChannels]; for(int i = 0; i < nbConnections; i++) { - connections[i] = connectionFactory.newConnection(); + connections[i] = connectionFactory.newConnection(generateConnectionName()); for(int j = 0; j < nbChannelsPerConnection; j++) { Channel channel = connections[i].createChannel(); channel.basicQos(1); @@ -347,7 +347,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF executorService.shutdownNow(); executorService = Executors.newFixedThreadPool(nbTasks); - tasks = new ArrayList>(); + tasks = new ArrayList<>(); for(int i = 0; i < nbTasks; i++) { Channel channelForConsuming = channels[i]; tasks.add(random.nextBoolean() ? @@ -376,7 +376,7 @@ public void multiThreadedMetricsStandardConnection(ConnectionFactory connectionF executorService.shutdownNow(); executorService = Executors.newFixedThreadPool(nbTasks); - tasks = new ArrayList>(); + tasks = new ArrayList<>(); for(int i = 0; i < nbTasks; i++) { Channel channelForConsuming = channels[i]; tasks.add(random.nextBoolean() ? @@ -405,7 +405,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti Connection connection = null; try { - connection = connectionFactory.newConnection(); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel = connection.createChannel(); assertThat(metrics.getConnections().getCount()).isEqualTo(1L); @@ -429,7 +429,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti Connection connection = null; try { - connection = connectionFactory.newConnection(UUID.randomUUID().toString()); + connection = connectionFactory.newConnection(generateConnectionName()); Collection shutdownHooks = getShutdownHooks(connection); assertThat(shutdownHooks.size()).isEqualTo(0); @@ -459,7 +459,7 @@ public void errorInChannel(ConnectionFactory connectionFactory) throws IOExcepti Connection connection = null; try { - connection = connectionFactory.newConnection(UUID.randomUUID().toString()); + connection = connectionFactory.newConnection(generateConnectionName()); Channel channel1 = connection.createChannel(); AtomicInteger ackedMessages = new AtomicInteger(0); diff --git a/src/test/java/com/rabbitmq/tools/Host.java b/src/test/java/com/rabbitmq/tools/Host.java index e9608ec15..76d63971f 100644 --- a/src/test/java/com/rabbitmq/tools/Host.java +++ b/src/test/java/com/rabbitmq/tools/Host.java @@ -40,7 +40,7 @@ public class Host { private static final Logger LOGGER = LoggerFactory.getLogger(Host.class); private static final String DOCKER_PREFIX = "DOCKER:"; - private static final Pattern CONNECTION_NAME_PATTERN = Pattern.compile("\"connection_name\",\"(?[a-zA-Z0-9\\-]+)?\""); + private static final Pattern CONNECTION_NAME_PATTERN = Pattern.compile("\"connection_name\",\"(?[a-zA-Z0-9\\-_]+)?\""); public static String hostname() { try {