Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ public void resetStats(){
}
}

@Override
public long getStartedTimestamp() {
return bridge.getStartedTimestamp();
}

@Override
public long getLocalExceptionCount() {
return bridge.getLocalExceptionCount();
}

@Override
public long getRemoteExceptionCount() {
return bridge.getRemoteExceptionCount();
}

public void addNetworkDestinationView(NetworkDestinationView networkDestinationView){
networkDestinationViewList.add(networkDestinationView);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public interface NetworkBridgeViewMBean extends Service {

void resetStats();

long getStartedTimestamp();

long getLocalExceptionCount();

long getRemoteExceptionCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,29 @@ public void setRemoteUserName(String remoteUserName) {
public boolean isAutoStart() {
return connector.isAutoStart();
}

@Override
public long getStartedTimestamp() {
return connector.getStartedTimestamp();
}

@Override
public long getStoppedTimestamp() {
return connector.getStoppedTimestamp();
}

@Override
public long getBridgeExceptionCount() {
return connector.getBridgeExceptionCounter().getCount();
}

@Override
public long getLocalExceptionCount() {
return connector.getLocalExceptionCounter().getCount();
}

@Override
public long getRemoteExceptionCount() {
return connector.getRemoteExceptionCounter().getCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,14 @@ public interface NetworkConnectorViewMBean extends Service {
void setRemotePassword(String remotePassword);

boolean isAutoStart();

long getStartedTimestamp();

long getStoppedTimestamp();

long getBridgeExceptionCount();

long getLocalExceptionCount();

long getRemoteExceptionCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;

import javax.management.ObjectName;
Expand Down Expand Up @@ -171,6 +172,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
private Transport duplexInboundLocalBroker = null;
private ProducerInfo duplexInboundLocalProducerInfo;
private AtomicLong startedTimestamp = new AtomicLong(0L);

public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
this.configuration = configuration;
Expand Down Expand Up @@ -338,6 +340,7 @@ public void run() {
startedLatch.countDown();
localStartedLatch.countDown();
staticDestinationsLatch.countDown();
startedTimestamp.set(0L);

ss.throwFirstException();
}
Expand Down Expand Up @@ -372,6 +375,7 @@ public void run() {
// Once we have all required broker info we can attempt to start
// the local and then remote sides of the bridge.
doStartLocalAndRemoteBridges();
startedTimestamp.set(System.currentTimeMillis());
} finally {
Thread.currentThread().setName(originalName);
}
Expand Down Expand Up @@ -647,13 +651,16 @@ protected void startRemoteBridge() throws Exception {

@Override
public void serviceRemoteException(Throwable error) {
networkBridgeStatistics.getRemoteExceptionCount().increment();

if (!disposed.get()) {
if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", localBroker, remoteBroker, error.toString());
} else {
LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", localBroker, remoteBroker, error.toString());
}
LOG.debug("The remote Exception was: {}", error, error);

brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -1112,6 +1119,8 @@ public void serviceLocalException(Throwable error) {

public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
networkBridgeStatistics.getLocalExceptionCount().increment();

if (!disposed.get()) {
if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
// not a reason to terminate the bridge - temps can disappear with
Expand Down Expand Up @@ -1926,6 +1935,21 @@ public long getEnqueueCounter() {
return networkBridgeStatistics.getEnqueues().getCount();
}

@Override
public long getStartedTimestamp() {
return startedTimestamp.get();
}

@Override
public long getLocalExceptionCount() {
return networkBridgeStatistics.getLocalExceptionCount().getCount();
}

@Override
public long getRemoteExceptionCount() {
return networkBridgeStatistics.getRemoteExceptionCount().getCount();
}

@Override
public NetworkBridgeStatistics getNetworkBridgeStatistics() {
return networkBridgeStatistics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public void onServiceAdd(DiscoveryEvent event) {
try {
remoteTransport = TransportFactory.connect(connectUri);
} catch (Exception e) {
remoteExceptionCounter.increment();
LOG.warn("Could not connect to remote URI: {}: {}", connectUri, e.getMessage());
LOG.debug("Connection failure exception: ", e);
try {
Expand All @@ -143,6 +144,7 @@ public void onServiceAdd(DiscoveryEvent event) {
try {
localTransport = createLocalTransport();
} catch (Exception e) {
localExceptionCounter.increment();
ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not connect to local URI: {}: {}", localURI, e.getMessage());
LOG.debug("Connection failure exception: ", e);
Expand All @@ -164,6 +166,7 @@ public void onServiceAdd(DiscoveryEvent event) {
}
bridge.start();
} catch (Exception e) {
bridgeExceptionCounter.increment();
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not start network bridge between: {} and: {} due to: {}", localURI, uri, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,10 @@ public interface NetworkBridge extends Service {
ObjectName getMbeanObjectName();

void resetStats();

long getStartedTimestamp();

long getLocalExceptionCount();

long getRemoteExceptionCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.activemq.network;

import java.util.Set;

import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.StatsImpl;

Expand All @@ -28,15 +30,17 @@ public class NetworkBridgeStatistics extends StatsImpl {
protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
protected CountStatisticImpl receivedCount;
protected CountStatisticImpl localExceptionCount;
protected CountStatisticImpl remoteExceptionCount;

public NetworkBridgeStatistics() {
enqueues = new CountStatisticImpl("enqueues", "The current number of enqueues this bridge has, which is the number of potential messages to be forwarded.");
dequeues = new CountStatisticImpl("dequeues", "The current number of dequeues this bridge has, which is the number of messages received by the remote broker.");
receivedCount = new CountStatisticImpl("receivedCount", "The number of messages that have been received by the NetworkBridge from the remote broker. Only applies for Duplex bridges.");
localExceptionCount = new CountStatisticImpl("localExceptionCount", "The number of exceptions that have been received by the NetworkBridge from the local broker.");
remoteExceptionCount = new CountStatisticImpl("remoteExceptionCount", "The number of exceptions that have been received by the NetworkBridge from the remote broker.");

addStatistic("enqueues", enqueues);
addStatistic("dequeues", dequeues);
addStatistic("receivedCount", receivedCount);
addStatistics(Set.of(enqueues, dequeues, receivedCount, localExceptionCount, remoteExceptionCount));
}

/**
Expand Down Expand Up @@ -69,13 +73,35 @@ public CountStatisticImpl getReceivedCount() {
return receivedCount;
}

/**
* The current number of exceptions this bridge has, which is the number of
* exceptions received from the remote broker.
*
* @return
*/
public CountStatisticImpl getLocalExceptionCount() {
return localExceptionCount;
}

/**
* The current number of exceptions this bridge has, which is the number of
* exceptions received from the remote broker.
*
* @return
*/
public CountStatisticImpl getRemoteExceptionCount() {
return remoteExceptionCount;
}

@Override
public void reset() {
if (this.isDoReset()) {
super.reset();
enqueues.reset();
dequeues.reset();
receivedCount.reset();
localExceptionCount.reset();
remoteExceptionCount.reset();
}
}

Expand All @@ -85,17 +111,23 @@ public void setEnabled(boolean enabled) {
enqueues.setEnabled(enabled);
dequeues.setEnabled(enabled);
receivedCount.setEnabled(enabled);
localExceptionCount.setEnabled(enabled);
remoteExceptionCount.setEnabled(enabled);
}

public void setParent(NetworkBridgeStatistics parent) {
if (parent != null) {
enqueues.setParent(parent.enqueues);
dequeues.setParent(parent.dequeues);
receivedCount.setParent(parent.receivedCount);
localExceptionCount.setParent(parent.localExceptionCount);
remoteExceptionCount.setParent(parent.remoteExceptionCount);
} else {
enqueues.setParent(null);
dequeues.setParent(null);
receivedCount.setParent(null);
localExceptionCount.setParent(null);
remoteExceptionCount.setParent(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
Expand All @@ -36,6 +37,8 @@
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.management.CountStatistic;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
Expand All @@ -51,6 +54,11 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
protected URI localURI;
protected ConnectionFilter connectionFilter;
protected ConcurrentMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
protected final AtomicLong startedTimestamp = new AtomicLong(0l);
protected final AtomicLong stoppedTimestamp = new AtomicLong(0l);
protected final CountStatisticImpl bridgeExceptionCounter = new CountStatisticImpl("bridgeExceptionCount", "Count of exceptions when establishing network bridge.");
protected final CountStatisticImpl localExceptionCounter = new CountStatisticImpl("localExceptionCount", "Count of exceptions when connecting to local broker.");
protected final CountStatisticImpl remoteExceptionCounter = new CountStatisticImpl("remoteExceptionCount", "Count of exceptions when connecting to remote broker.");

protected ServiceSupport serviceSupport = new ServiceSupport() {

Expand Down Expand Up @@ -162,22 +170,35 @@ public static ActiveMQDestination[] getDurableTopicDestinations(final Set<Active
@Override
public void start() throws Exception {
serviceSupport.start();
startedTimestamp.set(System.currentTimeMillis());
stoppedTimestamp.set(0l);
}

@Override
public void stop() throws Exception {
serviceSupport.stop();
stoppedTimestamp.set(System.currentTimeMillis());
startedTimestamp.set(0l);
}

protected void handleStart() throws Exception {
if (localURI == null) {
throw new IllegalStateException("You must configure the 'localURI' property");
}
LOG.info("Network Connector {} started", this);
bridgeExceptionCounter.setEnabled(true);
localExceptionCounter.setEnabled(true);
remoteExceptionCounter.setEnabled(true);
bridgeExceptionCounter.setCount(0l);
localExceptionCounter.setCount(0l);
remoteExceptionCounter.setCount(0l);
}

protected void handleStop(ServiceStopper stopper) throws Exception {
LOG.info("Network Connector {} stopped", this);
bridgeExceptionCounter.reset();
localExceptionCounter.reset();
remoteExceptionCounter.reset();
}

public boolean isStarted() {
Expand Down Expand Up @@ -255,4 +276,24 @@ public boolean removeDemandSubscription(ConsumerId consumerId) {
public Collection<NetworkBridge> activeBridges() {
return bridges.values();
}

public long getStartedTimestamp() {
return startedTimestamp.get();
}

public long getStoppedTimestamp() {
return stoppedTimestamp.get();
}

public CountStatistic getBridgeExceptionCounter() {
return bridgeExceptionCounter;
}

public CountStatistic getLocalExceptionCounter() {
return localExceptionCounter;
}

public CountStatistic getRemoteExceptionCounter() {
return remoteExceptionCounter;
}
}
Loading
Loading