diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java index 756ffbed7d2..6b2798cf6ca 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java @@ -99,6 +99,21 @@ public void resetStats(){ } } + @Override + public long getStartedTimestamp() { + return bridge.getStartedTimestamp(); + } + + @Override + public long getLocalExceptionCount() { + return bridge.getLocalExceptionCount(); + } + + @Override + public long getRemoteExceptionCount() { + return bridge.getRemoteExceptionCount(); + } + public void addNetworkDestinationView(NetworkDestinationView networkDestinationView){ networkDestinationViewList.add(networkDestinationView); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java index 82fc9ca1899..34d7d4b198e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java @@ -40,4 +40,9 @@ public interface NetworkBridgeViewMBean extends Service { void resetStats(); + long getStartedTimestamp(); + + long getLocalExceptionCount(); + + long getRemoteExceptionCount(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java index 05e6747dd7e..09e7da0755e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java @@ -203,4 +203,29 @@ public void setRemoteUserName(String remoteUserName) { public boolean isAutoStart() { return connector.isAutoStart(); } + + @Override + public long getStartedTimestamp() { + return connector.getStartedTimestamp(); + } + + @Override + public long getStoppedTimestamp() { + return connector.getStoppedTimestamp(); + } + + @Override + public long getBridgeExceptionCount() { + return connector.getBridgeExceptionCounter().getCount(); + } + + @Override + public long getLocalExceptionCount() { + return connector.getLocalExceptionCounter().getCount(); + } + + @Override + public long getRemoteExceptionCount() { + return connector.getRemoteExceptionCounter().getCount(); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java index e51b9a16caf..47a8da6c07d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java @@ -90,4 +90,14 @@ public interface NetworkConnectorViewMBean extends Service { void setRemotePassword(String remotePassword); boolean isAutoStart(); + + long getStartedTimestamp(); + + long getStoppedTimestamp(); + + long getBridgeExceptionCount(); + + long getLocalExceptionCount(); + + long getRemoteExceptionCount(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 8d16445fb96..e4cd449245f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import javax.management.ObjectName; @@ -171,6 +172,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor(); private Transport duplexInboundLocalBroker = null; private ProducerInfo duplexInboundLocalProducerInfo; + private AtomicLong startedTimestamp = new AtomicLong(0L); public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { this.configuration = configuration; @@ -338,6 +340,7 @@ public void run() { startedLatch.countDown(); localStartedLatch.countDown(); staticDestinationsLatch.countDown(); + startedTimestamp.set(0L); ss.throwFirstException(); } @@ -372,6 +375,7 @@ public void run() { // Once we have all required broker info we can attempt to start // the local and then remote sides of the bridge. doStartLocalAndRemoteBridges(); + startedTimestamp.set(System.currentTimeMillis()); } finally { Thread.currentThread().setName(originalName); } @@ -647,6 +651,8 @@ protected void startRemoteBridge() throws Exception { @Override public void serviceRemoteException(Throwable error) { + networkBridgeStatistics.getRemoteExceptionCount().increment(); + if (!disposed.get()) { if (error instanceof SecurityException || error instanceof GeneralSecurityException) { LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", localBroker, remoteBroker, error.toString()); @@ -654,6 +660,7 @@ public void serviceRemoteException(Throwable error) { LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", localBroker, remoteBroker, error.toString()); } LOG.debug("The remote Exception was: {}", error, error); + brokerService.getTaskRunnerFactory().execute(new Runnable() { @Override public void run() { @@ -1112,6 +1119,8 @@ public void serviceLocalException(Throwable error) { public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) { LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error); + networkBridgeStatistics.getLocalExceptionCount().increment(); + if (!disposed.get()) { if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) { // not a reason to terminate the bridge - temps can disappear with @@ -1926,6 +1935,21 @@ public long getEnqueueCounter() { return networkBridgeStatistics.getEnqueues().getCount(); } + @Override + public long getStartedTimestamp() { + return startedTimestamp.get(); + } + + @Override + public long getLocalExceptionCount() { + return networkBridgeStatistics.getLocalExceptionCount().getCount(); + } + + @Override + public long getRemoteExceptionCount() { + return networkBridgeStatistics.getRemoteExceptionCount().getCount(); + } + @Override public NetworkBridgeStatistics getNetworkBridgeStatistics() { return networkBridgeStatistics; diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index 3b9696cf925..5ed54f17c1f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -131,6 +131,7 @@ public void onServiceAdd(DiscoveryEvent event) { try { remoteTransport = TransportFactory.connect(connectUri); } catch (Exception e) { + remoteExceptionCounter.increment(); LOG.warn("Could not connect to remote URI: {}: {}", connectUri, e.getMessage()); LOG.debug("Connection failure exception: ", e); try { @@ -143,6 +144,7 @@ public void onServiceAdd(DiscoveryEvent event) { try { localTransport = createLocalTransport(); } catch (Exception e) { + localExceptionCounter.increment(); ServiceSupport.dispose(remoteTransport); LOG.warn("Could not connect to local URI: {}: {}", localURI, e.getMessage()); LOG.debug("Connection failure exception: ", e); @@ -164,6 +166,7 @@ public void onServiceAdd(DiscoveryEvent event) { } bridge.start(); } catch (Exception e) { + bridgeExceptionCounter.increment(); ServiceSupport.dispose(localTransport); ServiceSupport.dispose(remoteTransport); LOG.warn("Could not start network bridge between: {} and: {} due to: {}", localURI, uri, e.getMessage()); diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java index fb9e3d9f086..42d0cd38f86 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java @@ -95,4 +95,10 @@ public interface NetworkBridge extends Service { ObjectName getMbeanObjectName(); void resetStats(); + + long getStartedTimestamp(); + + long getLocalExceptionCount(); + + long getRemoteExceptionCount(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java index 50c39512798..8b52db81947 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java @@ -17,6 +17,8 @@ package org.apache.activemq.network; +import java.util.Set; + import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.StatsImpl; @@ -28,15 +30,17 @@ public class NetworkBridgeStatistics extends StatsImpl { protected CountStatisticImpl enqueues; protected CountStatisticImpl dequeues; protected CountStatisticImpl receivedCount; + protected CountStatisticImpl localExceptionCount; + protected CountStatisticImpl remoteExceptionCount; public NetworkBridgeStatistics() { enqueues = new CountStatisticImpl("enqueues", "The current number of enqueues this bridge has, which is the number of potential messages to be forwarded."); dequeues = new CountStatisticImpl("dequeues", "The current number of dequeues this bridge has, which is the number of messages received by the remote broker."); receivedCount = new CountStatisticImpl("receivedCount", "The number of messages that have been received by the NetworkBridge from the remote broker. Only applies for Duplex bridges."); + localExceptionCount = new CountStatisticImpl("localExceptionCount", "The number of exceptions that have been received by the NetworkBridge from the local broker."); + remoteExceptionCount = new CountStatisticImpl("remoteExceptionCount", "The number of exceptions that have been received by the NetworkBridge from the remote broker."); - addStatistic("enqueues", enqueues); - addStatistic("dequeues", dequeues); - addStatistic("receivedCount", receivedCount); + addStatistics(Set.of(enqueues, dequeues, receivedCount, localExceptionCount, remoteExceptionCount)); } /** @@ -69,6 +73,26 @@ public CountStatisticImpl getReceivedCount() { return receivedCount; } + /** + * The current number of exceptions this bridge has, which is the number of + * exceptions received from the remote broker. + * + * @return + */ + public CountStatisticImpl getLocalExceptionCount() { + return localExceptionCount; + } + + /** + * The current number of exceptions this bridge has, which is the number of + * exceptions received from the remote broker. + * + * @return + */ + public CountStatisticImpl getRemoteExceptionCount() { + return remoteExceptionCount; + } + @Override public void reset() { if (this.isDoReset()) { @@ -76,6 +100,8 @@ public void reset() { enqueues.reset(); dequeues.reset(); receivedCount.reset(); + localExceptionCount.reset(); + remoteExceptionCount.reset(); } } @@ -85,6 +111,8 @@ public void setEnabled(boolean enabled) { enqueues.setEnabled(enabled); dequeues.setEnabled(enabled); receivedCount.setEnabled(enabled); + localExceptionCount.setEnabled(enabled); + remoteExceptionCount.setEnabled(enabled); } public void setParent(NetworkBridgeStatistics parent) { @@ -92,10 +120,14 @@ public void setParent(NetworkBridgeStatistics parent) { enqueues.setParent(parent.enqueues); dequeues.setParent(parent.dequeues); receivedCount.setParent(parent.receivedCount); + localExceptionCount.setParent(parent.localExceptionCount); + remoteExceptionCount.setParent(parent.remoteExceptionCount); } else { enqueues.setParent(null); dequeues.setParent(null); receivedCount.setParent(null); + localExceptionCount.setParent(null); + remoteExceptionCount.setParent(null); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java index 1bbc483d5c5..66eb25da8b5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -36,6 +37,8 @@ import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.management.CountStatistic; +import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.transport.Transport; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; @@ -51,6 +54,11 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem protected URI localURI; protected ConnectionFilter connectionFilter; protected ConcurrentMap bridges = new ConcurrentHashMap(); + protected final AtomicLong startedTimestamp = new AtomicLong(0l); + protected final AtomicLong stoppedTimestamp = new AtomicLong(0l); + protected final CountStatisticImpl bridgeExceptionCounter = new CountStatisticImpl("bridgeExceptionCount", "Count of exceptions when establishing network bridge."); + protected final CountStatisticImpl localExceptionCounter = new CountStatisticImpl("localExceptionCount", "Count of exceptions when connecting to local broker."); + protected final CountStatisticImpl remoteExceptionCounter = new CountStatisticImpl("remoteExceptionCount", "Count of exceptions when connecting to remote broker."); protected ServiceSupport serviceSupport = new ServiceSupport() { @@ -162,11 +170,15 @@ public static ActiveMQDestination[] getDurableTopicDestinations(final Set activeBridges() { return bridges.values(); } + + public long getStartedTimestamp() { + return startedTimestamp.get(); + } + + public long getStoppedTimestamp() { + return stoppedTimestamp.get(); + } + + public CountStatistic getBridgeExceptionCounter() { + return bridgeExceptionCounter; + } + + public CountStatistic getLocalExceptionCounter() { + return localExceptionCounter; + } + + public CountStatistic getRemoteExceptionCounter() { + return remoteExceptionCounter; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java index 5563ded869e..87ab07465e1 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java @@ -329,6 +329,21 @@ public long getDequeueCounter() { return next.getDequeueCounter(); } + @Override + public long getLocalExceptionCount() { + return next.getLocalExceptionCount(); + } + + @Override + public long getRemoteExceptionCount() { + return next.getRemoteExceptionCount(); + } + + @Override + public long getStartedTimestamp() { + return next.getStartedTimestamp(); + } + @Override public NetworkBridgeStatistics getNetworkBridgeStatistics() { return next.getNetworkBridgeStatistics(); @@ -348,6 +363,7 @@ public ObjectName getMbeanObjectName() { public void resetStats(){ next.resetStats(); } + }; } }; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java index 50f61d35dea..f96b449dc89 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.network; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -87,12 +88,12 @@ protected void doSetUp(boolean deleteAllMessages) throws Exception { ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); fac.setAlwaysSyncSend(true); fac.setDispatchAsync(false); - localConnection = fac.createConnection(); + localConnection = fac.createConnection("localAdmin", "passwordA"); localConnection.setClientID("localClientId"); URI remoteURI = remoteBroker.getVmConnectorURI(); fac = new ActiveMQConnectionFactory(remoteURI); - remoteConnection = fac.createConnection(); + remoteConnection = fac.createConnection("remoteAdmin", "passwordB"); remoteConnection.setClientID("remoteClientId"); localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -111,6 +112,56 @@ protected String getLocalBrokerURI() { //Added for AMQ-9437 test advancedStatistics for networkEnqueue and networkDequeue @Test(timeout = 120 * 1000) + public void testNetworkAdvancedStatisticsErrors() throws Exception { + NetworkConnector localNetworkConnector = localBroker.getNetworkConnectorByName("local-to-remote"); + var originalLocalPassword = localNetworkConnector.getPassword(); + var originalRemotePassword = localNetworkConnector.getRemotePassword(); + + assertNotNull(localNetworkConnector); + assertTrue(localNetworkConnector.isAutoStart()); + assertTrue(localNetworkConnector.isStarted()); + + assertNetworkStats(false, false, 0L, 0L, localNetworkConnector); + + try { + localNetworkConnector.stop(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return localNetworkConnector.isStopped(); + } + }); + assertNetworkStats(true, true, 0L, 0L, localNetworkConnector); + + localNetworkConnector.setRemotePassword("foo"); + localNetworkConnector.start(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return localNetworkConnector.isStarted(); + } + }); + + assertNetworkStats(false, false, 0L, 1L, localNetworkConnector); + + } finally { + localNetworkConnector.setPassword(originalLocalPassword); + localNetworkConnector.setRemotePassword(originalRemotePassword); + localNetworkConnector.start(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return localNetworkConnector.isStarted(); + } + }); + } + } + + //Added for AMQ-9437 test advancedStatistics for networkEnqueue and networkDequeue + @Test(timeout = 60 * 1000) public void testNetworkAdvancedStatistics() throws Exception { // create a remote durable consumer to create demand @@ -200,7 +251,7 @@ public boolean isSatisified() throws Exception { assertEquals(lastIncludedSentMessageID, localBrokerIncludedMessageFlowStats.getDequeuedMessageID().getValue()); assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue()); assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue()); - assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageClientID().getValue().startsWith("networkConnector")); + assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageClientID().getValue().startsWith("local-to-remote")); assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageTimestamp().getValue()); if(includedDestination.isTopic() && !durable) { @@ -324,4 +375,21 @@ public boolean isSatisified() throws Exception { })); } -} \ No newline at end of file + protected static void assertNetworkStats(final boolean connectorStartTimestampZeroExpected, final boolean bridgeStartTimestampZeroExpected, final long localExceptionCount, final long remoteExceptionCount, final NetworkConnector networkConnector) { + if(connectorStartTimestampZeroExpected) { + assertEquals(Long.valueOf(0L), Long.valueOf(networkConnector.getStartedTimestamp())); + } else { + assertNotEquals(Long.valueOf(0L), Long.valueOf(networkConnector.getStartedTimestamp())); + } + + for(var networkBridge : networkConnector.activeBridges()) { + if(bridgeStartTimestampZeroExpected) { + assertEquals(Long.valueOf(0L), Long.valueOf(networkBridge.getStartedTimestamp())); + } else { + assertNotEquals(Long.valueOf(0L), Long.valueOf(networkBridge.getStartedTimestamp())); + } + assertEquals(Long.valueOf(localExceptionCount), Long.valueOf(networkBridge.getLocalExceptionCount())); + assertEquals(Long.valueOf(localExceptionCount), Long.valueOf(networkBridge.getRemoteExceptionCount())); + } + } +} diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml index f17fa9bcc53..4c89e565705 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml @@ -42,7 +42,11 @@ dynamicOnly = "false" conduitSubscriptions = "true" decreaseNetworkConsumerPriority = "false" - name="networkConnector"> + name="local-to-remote" + userName="localAdmin" + password="passwordA" + remoteUserName="remoteAdmin" + remotePassword="passwordB"> @@ -56,10 +60,32 @@ + + + + + + + + + + + + + + + + + + + + + + - + diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml index d2b3c0f33df..babbb6bb61e 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml @@ -37,9 +37,29 @@ - - - + + + + + + + + + + + + + + + + + + + + + + +