diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/BrokerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/BrokerCommand.java new file mode 100644 index 00000000000..fb926017cd8 --- /dev/null +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/BrokerCommand.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; +import org.apache.activemq.console.util.JmxMBeansUtil; + +public class BrokerCommand extends AbstractJmxCommand { + + protected String[] helpFile = new String[] { + "Task Usage: activemq broker [options] [args]", + "Description: Inspect broker attributes, manage transport connectors, and view the scheduler.", + "", + "Actions:", + " info Show broker attributes (name, version, uptime, memory, etc).", + " connectors List all transport connectors and their URIs.", + " connectors add Add a new transport connector (e.g. tcp://0.0.0.0:61617).", + " connectors remove Remove a transport connector by name.", + " scheduler Show scheduler statistics and upcoming jobs.", + "", + "Options:", + " --jmxurl Set the JMX URL to connect to.", + " --pid Set the pid to connect to (only on Sun JVM).", + " --jmxuser Set the JMX user used for authenticating.", + " --jmxpassword Set the JMX password used for authenticating.", + " --jmxlocal Use the local JMX server instead of a remote one.", + " --version Display the version information.", + " -h,-?,--help Display this help information.", + "", + "Examples:", + " activemq broker info", + " - Show broker name, version, uptime, memory/store usage, and connection counts.", + " activemq broker connectors", + " - List all transport connectors (tcp, ssl, amqp, stomp, mqtt, ws, etc).", + " activemq broker connectors add tcp://0.0.0.0:61617", + " - Add a new TCP transport connector on port 61617.", + " activemq broker connectors remove tcp", + " - Remove the transport connector named 'tcp'.", + " activemq broker scheduler", + " - Show scheduled message counts and upcoming jobs.", + "" + }; + + @Override + public String getName() { + return "broker"; + } + + @Override + public String getOneLineDescription() { + return "Show broker info, manage transport connectors, or inspect the scheduler"; + } + + @Override + protected void runTask(List tokens) throws Exception { + if (tokens.isEmpty()) { + printHelp(); + return; + } + + String action = tokens.remove(0); + if (action.equals("info")) { + showBrokerInfo(); + } else if (action.equals("connectors")) { + handleConnectors(tokens); + } else if (action.equals("scheduler")) { + showSchedulerInfo(); + } else { + context.printInfo("Unknown action '" + action + "'. See 'activemq broker --help'."); + printHelp(); + } + } + + private void handleConnectors(List tokens) throws Exception { + if (tokens.isEmpty() || tokens.get(0).equals("list")) { + listConnectors(); + return; + } + String subAction = tokens.remove(0); + if (subAction.equals("add")) { + if (tokens.isEmpty()) { + throw new IllegalArgumentException("Connector URI required: activemq broker connectors add "); + } + addConnector(tokens.get(0)); + } else if (subAction.equals("remove")) { + if (tokens.isEmpty()) { + throw new IllegalArgumentException("Connector name required: activemq broker connectors remove "); + } + removeConnector(tokens.get(0)); + } else { + context.printInfo("Unknown connectors sub-action '" + subAction + "'. Valid: list, add, remove."); + printHelp(); + } + } + + private void showBrokerInfo() throws Exception { + BrokerViewMBean broker = getBrokerMBean(); + + context.print("Broker Name : " + broker.getBrokerName()); + context.print("Broker ID : " + broker.getBrokerId()); + context.print("Version : " + broker.getBrokerVersion()); + context.print("Uptime : " + broker.getUptime()); + context.print("Persistent : " + broker.isPersistent()); + context.print("Data Directory : " + broker.getDataDirectory()); + context.print(""); + context.print("Connections : " + broker.getCurrentConnectionsCount() + " current / " + broker.getTotalConnectionsCount() + " total"); + context.print("Producers : " + broker.getTotalProducerCount()); + context.print("Consumers : " + broker.getTotalConsumerCount()); + context.print("Messages : " + broker.getTotalMessageCount()); + context.print(""); + context.print("Memory usage : " + broker.getMemoryPercentUsage() + "% (limit " + broker.getMemoryLimit() / (1024 * 1024) + " MB)"); + context.print("Store usage : " + broker.getStorePercentUsage() + "% (limit " + broker.getStoreLimit() / (1024 * 1024) + " MB)"); + context.print("Temp usage : " + broker.getTempPercentUsage() + "% (limit " + broker.getTempLimit() / (1024 * 1024) + " MB)"); + context.print(""); + context.print("Total enqueued : " + broker.getTotalEnqueueCount()); + context.print("Total dequeued : " + broker.getTotalDequeueCount()); + context.print("Queues : " + broker.getTotalQueuesCount()); + context.print("Topics : " + broker.getTotalTopicsCount()); + } + + private void listConnectors() throws Exception { + BrokerViewMBean broker = getBrokerMBean(); + Map connectors = broker.getTransportConnectors(); + + if (connectors == null || connectors.isEmpty()) { + context.print("No transport connectors configured."); + return; + } + + final String fmt = "%-12s %s"; + context.print(String.format(fmt, "Name", "URI")); + context.print(String.format(fmt, dashes(12), dashes(60))); + + for (Map.Entry entry : connectors.entrySet()) { + context.print(String.format(fmt, entry.getKey(), entry.getValue())); + } + } + + private void addConnector(String uri) throws Exception { + String connectorName = getBrokerMBean().addConnector(uri); + context.print("Transport connector added: " + connectorName + " -> " + uri); + } + + private void removeConnector(String name) throws Exception { + boolean removed = getBrokerMBean().removeConnector(name); + if (removed) { + context.print("Transport connector removed: " + name); + } else { + context.printInfo("Transport connector not found: " + name); + } + } + + private void showSchedulerInfo() throws Exception { + BrokerViewMBean broker = getBrokerMBean(); + ObjectName schedulerName = broker.getJMSJobScheduler(); + + if (schedulerName == null) { + context.print("Job scheduler is not enabled on this broker."); + return; + } + + JobSchedulerViewMBean scheduler = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), schedulerName, JobSchedulerViewMBean.class, true); + + context.print("Scheduled messages : " + scheduler.getScheduledMessageCount()); + context.print("Delayed messages : " + scheduler.getDelayedMessageCount()); + context.print("Next scheduled at : " + scheduler.getNextScheduleTime()); + + TabularData jobs = scheduler.getNextScheduleJobs(); + if (jobs == null || jobs.isEmpty()) { + context.print("No upcoming jobs."); + return; + } + + context.print(""); + context.print("Upcoming jobs:"); + context.print(dashes(60)); + + Collection rows = jobs.values(); + int index = 1; + for (Object row : rows) { + CompositeData job = (CompositeData) row; + context.print("Job " + index++ + ":"); + for (String key : job.getCompositeType().keySet()) { + Object value = job.get(key); + if (value != null && !value.toString().isEmpty()) { + context.print(String.format(" %-20s: %s", key, value)); + } + } + context.print(""); + } + } + + @SuppressWarnings("unchecked") + private BrokerViewMBean getBrokerMBean() throws Exception { + List brokers = JmxMBeansUtil.getAllBrokers(createJmxConnection()); + if (brokers.isEmpty()) { + throw new Exception("No broker found in JMX context."); + } + ObjectName brokerName = brokers.get(0).getObjectName(); + return MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), brokerName, BrokerViewMBean.class, true); + } + + private static String dashes(int count) { + StringBuilder sb = new StringBuilder(count); + for (int i = 0; i < count; i++) { + sb.append('-'); + } + return sb.toString(); + } + + @Override + protected void printHelp() { + context.printHelp(helpFile); + } +} diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/NetworkConnectorsCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/NetworkConnectorsCommand.java new file mode 100644 index 00000000000..25bd3e17d1e --- /dev/null +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/NetworkConnectorsCommand.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command; + +import java.util.List; + +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectInstance; +import javax.management.ObjectName; + +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; +import org.apache.activemq.console.util.JmxMBeansUtil; + +public class NetworkConnectorsCommand extends AbstractJmxCommand { + + protected String[] helpFile = new String[] { + "Task Usage: activemq network-connectors [options] [args]", + "Description: List, add, or remove network connectors on the broker.", + "", + "Actions:", + " list List all network connectors.", + " add Add a new network connector (e.g. static:(tcp://remote:61616)).", + " remove Remove a network connector by name.", + "", + "Options:", + " --jmxurl Set the JMX URL to connect to.", + " --pid Set the pid to connect to (only on Sun JVM).", + " --jmxuser Set the JMX user used for authenticating.", + " --jmxpassword Set the JMX password used for authenticating.", + " --jmxlocal Use the local JMX server instead of a remote one.", + " --version Display the version information.", + " -h,-?,--help Display this help information.", + "", + "Examples:", + " activemq network-connectors list", + " - List all configured network connectors.", + " activemq network-connectors add static:(tcp://remote-broker:61616)", + " - Add a static network connector to a remote broker.", + " activemq network-connectors remove NC1", + " - Remove the network connector named NC1.", + "" + }; + + @Override + public String getName() { + return "network-connectors"; + } + + @Override + public String getOneLineDescription() { + return "List, add, or remove network connectors"; + } + + @Override + protected void runTask(List tokens) throws Exception { + if (tokens.isEmpty()) { + printHelp(); + return; + } + + String action = tokens.remove(0); + if (action.equals("list")) { + listNetworkConnectors(); + } else if (action.equals("add")) { + if (tokens.isEmpty()) { + throw new IllegalArgumentException("Connector URI required: activemq network-connectors add "); + } + addNetworkConnector(tokens.get(0)); + } else if (action.equals("remove")) { + if (tokens.isEmpty()) { + throw new IllegalArgumentException("Connector name required: activemq network-connectors remove "); + } + removeNetworkConnector(tokens.get(0)); + } else { + context.printInfo("Unknown action '" + action + "'. See 'activemq network-connectors --help'."); + printHelp(); + } + } + + @SuppressWarnings("unchecked") + private void listNetworkConnectors() throws Exception { + List connectors = JmxMBeansUtil.queryMBeans(createJmxConnection(), + "type=Broker,brokerName=*,connector=networkConnectors,networkConnectorName=*"); + + if (connectors.isEmpty()) { + context.print("No network connectors configured."); + return; + } + + final String fmt = "%-30s %-10s %-10s %s"; + context.print(String.format(fmt, "Name", "Auto-Start", "Duplex", "User")); + context.print(String.format(fmt, dashes(30), dashes(10), dashes(10), dashes(20))); + + for (ObjectInstance obj : connectors) { + ObjectName name = obj.getObjectName(); + NetworkConnectorViewMBean nc = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), name, NetworkConnectorViewMBean.class, true); + context.print(String.format(fmt, + name.getKeyProperty("networkConnectorName"), + nc.isAutoStart(), + nc.isDuplex(), + nc.getUserName() != null ? nc.getUserName() : "")); + } + } + + private void addNetworkConnector(String uri) throws Exception { + String connectorName = getBrokerMBean().addNetworkConnector(uri); + context.print("Network connector added: " + connectorName + " -> " + uri); + } + + private void removeNetworkConnector(String name) throws Exception { + boolean removed = getBrokerMBean().removeNetworkConnector(name); + if (removed) { + context.print("Network connector removed: " + name); + } else { + context.printInfo("Network connector not found: " + name); + } + } + + @SuppressWarnings("unchecked") + private BrokerViewMBean getBrokerMBean() throws Exception { + List brokers = JmxMBeansUtil.getAllBrokers(createJmxConnection()); + if (brokers.isEmpty()) { + throw new Exception("No broker found in JMX context."); + } + ObjectName brokerName = brokers.get(0).getObjectName(); + return MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), brokerName, BrokerViewMBean.class, true); + } + + private static String dashes(int count) { + StringBuilder sb = new StringBuilder(count); + for (int i = 0; i < count; i++) { + sb.append('-'); + } + return sb.toString(); + } + + @Override + protected void printHelp() { + context.printHelp(helpFile); + } +} diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/QueuesCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/QueuesCommand.java new file mode 100644 index 00000000000..27c5c7a280d --- /dev/null +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/QueuesCommand.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; + +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; + +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.console.util.JmxMBeansUtil; + +public class QueuesCommand extends AbstractJmxCommand { + + protected String[] helpFile = new String[] { + "Task Usage: activemq queues [options] [queue-name] [args]", + "Description: List, create, delete, purge, browse, produce, pause, resume, or inspect queues.", + "", + "Actions:", + " list List all queues with their key statistics.", + " create Create a new queue.", + " delete Delete an existing queue.", + " purge Purge all messages from a queue.", + " info Show detailed statistics for a queue.", + " browse Browse messages in a queue (non-destructive).", + " produce Send a text message to a queue.", + " pause Pause dispatch on a queue.", + " resume Resume dispatch on a paused queue.", + "", + "Options:", + " --jmxurl Set the JMX URL to connect to.", + " --pid Set the pid to connect to (only on Sun JVM).", + " --jmxuser Set the JMX user used for authenticating.", + " --jmxpassword Set the JMX password used for authenticating.", + " --jmxlocal Use the local JMX server instead of a remote one.", + " --version Display the version information.", + " -h,-?,--help Display this help information.", + "", + "Examples:", + " activemq queues list", + " - List all queues with their statistics.", + " activemq queues create FOO.BAR", + " - Create a new queue named FOO.BAR.", + " activemq queues delete FOO.BAR", + " - Delete the queue named FOO.BAR.", + " activemq queues purge FOO.BAR", + " - Purge all messages from queue FOO.BAR.", + " activemq queues info FOO.BAR", + " - Show detailed statistics for queue FOO.BAR.", + " activemq queues browse FOO.BAR", + " - Browse all messages in queue FOO.BAR without consuming them.", + " activemq queues produce FOO.BAR \"Hello World\"", + " - Send a text message with body 'Hello World' to queue FOO.BAR.", + " activemq queues pause FOO.BAR", + " - Pause message dispatch on queue FOO.BAR.", + " activemq queues resume FOO.BAR", + " - Resume message dispatch on queue FOO.BAR.", + "" + }; + + @Override + public String getName() { + return "queues"; + } + + @Override + public String getOneLineDescription() { + return "List, create, delete, purge, browse, produce, pause, or resume queues"; + } + + @Override + protected void runTask(List tokens) throws Exception { + if (tokens.isEmpty()) { + printHelp(); + return; + } + + String action = tokens.remove(0); + if (action.equals("list")) { + listQueues(); + } else if (action.equals("create")) { + requireQueueName(tokens, "create"); + createQueue(tokens.get(0)); + } else if (action.equals("delete")) { + requireQueueName(tokens, "delete"); + deleteQueue(tokens.get(0)); + } else if (action.equals("purge")) { + requireQueueName(tokens, "purge"); + purgeQueue(tokens.get(0)); + } else if (action.equals("info")) { + requireQueueName(tokens, "info"); + infoQueue(tokens.get(0)); + } else if (action.equals("browse")) { + requireQueueName(tokens, "browse"); + browseQueue(tokens.get(0)); + } else if (action.equals("produce")) { + requireQueueName(tokens, "produce"); + if (tokens.size() < 2) { + throw new IllegalArgumentException("Message body required: activemq queues produce "); + } + produceMessage(tokens.get(0), tokens.get(1)); + } else if (action.equals("pause")) { + requireQueueName(tokens, "pause"); + pauseQueue(tokens.get(0)); + } else if (action.equals("resume")) { + requireQueueName(tokens, "resume"); + resumeQueue(tokens.get(0)); + } else { + context.printInfo("Unknown action '" + action + "'. See 'activemq queues --help'."); + printHelp(); + } + } + + @SuppressWarnings("unchecked") + private void listQueues() throws Exception { + List queueList = JmxMBeansUtil.queryMBeans(createJmxConnection(), + "type=Broker,brokerName=*,destinationType=Queue,destinationName=*"); + + Collections.sort(queueList, new Comparator() { + @Override + public int compare(ObjectInstance o1, ObjectInstance o2) { + return o1.getObjectName().compareTo(o2.getObjectName()); + } + }); + + final String fmt = "%-50s %10s %10s %10s %10s %10s"; + context.print(String.format(Locale.US, fmt, "Name", "Messages", "Consumers", "Producers", "Enqueued", "Dequeued")); + context.print(String.format(Locale.US, fmt, + dashes(50), dashes(10), dashes(10), dashes(10), dashes(10), dashes(10))); + + for (ObjectInstance obj : queueList) { + ObjectName name = obj.getObjectName(); + QueueViewMBean q = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), name, QueueViewMBean.class, true); + context.print(String.format(Locale.US, "%-50s %10d %10d %10d %10d %10d", + q.getName(), + q.getQueueSize(), + q.getConsumerCount(), + q.getProducerCount(), + q.getEnqueueCount(), + q.getDequeueCount())); + } + + if (queueList.isEmpty()) { + context.print("No queues found."); + } + } + + private void createQueue(String queueName) throws Exception { + getBrokerMBean().addQueue(queueName); + context.print("Queue created: " + queueName); + } + + private void deleteQueue(String queueName) throws Exception { + getBrokerMBean().removeQueue(queueName); + context.print("Queue deleted: " + queueName); + } + + @SuppressWarnings("unchecked") + private void purgeQueue(String queueName) throws Exception { + List results = JmxMBeansUtil.queryMBeans(createJmxConnection(), + "type=Broker,brokerName=*,destinationType=Queue,destinationName=" + queueName); + if (results.isEmpty()) { + context.printInfo("Queue not found: " + queueName); + return; + } + ObjectName name = results.get(0).getObjectName(); + QueueViewMBean q = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), name, QueueViewMBean.class, true); + q.purge(); + context.print("Queue purged: " + queueName); + } + + @SuppressWarnings("unchecked") + private void infoQueue(String queueName) throws Exception { + List results = JmxMBeansUtil.queryMBeans(createJmxConnection(), + "type=Broker,brokerName=*,destinationType=Queue,destinationName=" + queueName); + if (results.isEmpty()) { + context.printInfo("Queue not found: " + queueName); + return; + } + ObjectName name = results.get(0).getObjectName(); + QueueViewMBean q = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), name, QueueViewMBean.class, true); + + context.print("Name : " + q.getName()); + context.print("Messages : " + q.getQueueSize()); + context.print("Consumers : " + q.getConsumerCount()); + context.print("Producers : " + q.getProducerCount()); + context.print("Enqueued : " + q.getEnqueueCount()); + context.print("Dequeued : " + q.getDequeueCount()); + context.print("In-flight : " + q.getInFlightCount()); + context.print("Memory usage : " + q.getMemoryPercentUsage() + "%"); + context.print("Paused : " + q.isPaused()); + } + + @SuppressWarnings("unchecked") + private void browseQueue(String queueName) throws Exception { + List results = JmxMBeansUtil.queryMBeans(createJmxConnection(), + "type=Broker,brokerName=*,destinationType=Queue,destinationName=" + queueName); + if (results.isEmpty()) { + context.printInfo("Queue not found: " + queueName); + return; + } + ObjectName name = results.get(0).getObjectName(); + QueueViewMBean q = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), name, QueueViewMBean.class, true); + CompositeData[] messages = q.browse(); + if (messages == null || messages.length == 0) { + context.print("No messages in queue: " + queueName); + return; + } + context.print("Browsing queue: " + queueName + " (" + messages.length + " message(s))"); + for (int i = 0; i < messages.length; i++) { + CompositeData msg = messages[i]; + context.print(""); + context.print("--- Message " + (i + 1) + " ---"); + for (String key : msg.getCompositeType().keySet()) { + Object value = msg.get(key); + if (value != null && !value.toString().isEmpty()) { + context.print(String.format(" %-24s: %s", key, value)); + } + } + } + } + + @SuppressWarnings("unchecked") + private void produceMessage(String queueName, String body) throws Exception { + List results = JmxMBeansUtil.queryMBeans(createJmxConnection(), + "type=Broker,brokerName=*,destinationType=Queue,destinationName=" + queueName); + if (results.isEmpty()) { + context.printInfo("Queue not found: " + queueName); + return; + } + ObjectName name = results.get(0).getObjectName(); + QueueViewMBean q = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), name, QueueViewMBean.class, true); + String messageId = q.sendTextMessage(body); + context.print("Message sent to " + queueName + ". ID: " + messageId); + } + + @SuppressWarnings("unchecked") + private void pauseQueue(String queueName) throws Exception { + List results = JmxMBeansUtil.queryMBeans(createJmxConnection(), + "type=Broker,brokerName=*,destinationType=Queue,destinationName=" + queueName); + if (results.isEmpty()) { + context.printInfo("Queue not found: " + queueName); + return; + } + ObjectName name = results.get(0).getObjectName(); + QueueViewMBean q = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), name, QueueViewMBean.class, true); + q.pause(); + context.print("Queue paused: " + queueName); + } + + @SuppressWarnings("unchecked") + private void resumeQueue(String queueName) throws Exception { + List results = JmxMBeansUtil.queryMBeans(createJmxConnection(), + "type=Broker,brokerName=*,destinationType=Queue,destinationName=" + queueName); + if (results.isEmpty()) { + context.printInfo("Queue not found: " + queueName); + return; + } + ObjectName name = results.get(0).getObjectName(); + QueueViewMBean q = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), name, QueueViewMBean.class, true); + q.resume(); + context.print("Queue resumed: " + queueName); + } + + @SuppressWarnings("unchecked") + private BrokerViewMBean getBrokerMBean() throws Exception { + List brokers = JmxMBeansUtil.getAllBrokers(createJmxConnection()); + if (brokers.isEmpty()) { + throw new Exception("No broker found in JMX context."); + } + ObjectName brokerName = brokers.get(0).getObjectName(); + return MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), brokerName, BrokerViewMBean.class, true); + } + + private void requireQueueName(List tokens, String action) throws Exception { + if (tokens.isEmpty()) { + throw new IllegalArgumentException("Queue name required for '" + action + "'."); + } + } + + private static String dashes(int count) { + StringBuilder sb = new StringBuilder(count); + for (int i = 0; i < count; i++) { + sb.append('-'); + } + return sb.toString(); + } + + @Override + protected void printHelp() { + context.printHelp(helpFile); + } +} diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/TopicsCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/TopicsCommand.java new file mode 100644 index 00000000000..3b01d15fca9 --- /dev/null +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/TopicsCommand.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; + +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectInstance; +import javax.management.ObjectName; + +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.TopicViewMBean; +import org.apache.activemq.console.util.JmxMBeansUtil; + +public class TopicsCommand extends AbstractJmxCommand { + + protected String[] helpFile = new String[] { + "Task Usage: activemq topics [options] [topic-name]", + "Description: List, create, delete, or inspect topics on the broker.", + "", + "Actions:", + " list List all topics with their key statistics.", + " create Create a new topic.", + " delete Delete an existing topic.", + " info Show detailed statistics for a topic.", + "", + "Options:", + " --jmxurl Set the JMX URL to connect to.", + " --pid Set the pid to connect to (only on Sun JVM).", + " --jmxuser Set the JMX user used for authenticating.", + " --jmxpassword Set the JMX password used for authenticating.", + " --jmxlocal Use the local JMX server instead of a remote one.", + " --version Display the version information.", + " -h,-?,--help Display this help information.", + "", + "Examples:", + " activemq topics list", + " - List all topics with their statistics.", + " activemq topics create MY.TOPIC", + " - Create a new topic named MY.TOPIC.", + " activemq topics delete MY.TOPIC", + " - Delete the topic named MY.TOPIC.", + " activemq topics info MY.TOPIC", + " - Show detailed statistics for topic MY.TOPIC.", + "" + }; + + @Override + public String getName() { + return "topics"; + } + + @Override + public String getOneLineDescription() { + return "List, create, delete, or inspect topics"; + } + + @Override + protected void runTask(List tokens) throws Exception { + if (tokens.isEmpty()) { + printHelp(); + return; + } + + String action = tokens.remove(0); + if (action.equals("list")) { + listTopics(); + } else if (action.equals("create")) { + requireTopicName(tokens, "create"); + createTopic(tokens.get(0)); + } else if (action.equals("delete")) { + requireTopicName(tokens, "delete"); + deleteTopic(tokens.get(0)); + } else if (action.equals("info")) { + requireTopicName(tokens, "info"); + infoTopic(tokens.get(0)); + } else { + context.printInfo("Unknown action '" + action + "'. See 'activemq topics --help'."); + printHelp(); + } + } + + @SuppressWarnings("unchecked") + private void listTopics() throws Exception { + List topicList = JmxMBeansUtil.queryMBeans(createJmxConnection(), + "type=Broker,brokerName=*,destinationType=Topic,destinationName=*"); + + Collections.sort(topicList, new Comparator() { + @Override + public int compare(ObjectInstance o1, ObjectInstance o2) { + return o1.getObjectName().compareTo(o2.getObjectName()); + } + }); + + final String fmt = "%-50s %10s %10s %10s %10s %10s"; + context.print(String.format(Locale.US, fmt, "Name", "Messages", "Consumers", "Producers", "Enqueued", "Dequeued")); + context.print(String.format(Locale.US, fmt, + dashes(50), dashes(10), dashes(10), dashes(10), dashes(10), dashes(10))); + + for (ObjectInstance obj : topicList) { + ObjectName name = obj.getObjectName(); + TopicViewMBean t = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), name, TopicViewMBean.class, true); + context.print(String.format(Locale.US, "%-50s %10d %10d %10d %10d %10d", + t.getName(), + t.getQueueSize(), + t.getConsumerCount(), + t.getProducerCount(), + t.getEnqueueCount(), + t.getDequeueCount())); + } + + if (topicList.isEmpty()) { + context.print("No topics found."); + } + } + + private void createTopic(String topicName) throws Exception { + getBrokerMBean().addTopic(topicName); + context.print("Topic created: " + topicName); + } + + private void deleteTopic(String topicName) throws Exception { + getBrokerMBean().removeTopic(topicName); + context.print("Topic deleted: " + topicName); + } + + @SuppressWarnings("unchecked") + private void infoTopic(String topicName) throws Exception { + List results = JmxMBeansUtil.queryMBeans(createJmxConnection(), + "type=Broker,brokerName=*,destinationType=Topic,destinationName=" + topicName); + if (results.isEmpty()) { + context.printInfo("Topic not found: " + topicName); + return; + } + ObjectName name = results.get(0).getObjectName(); + TopicViewMBean t = MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), name, TopicViewMBean.class, true); + + context.print("Name : " + t.getName()); + context.print("Messages : " + t.getQueueSize()); + context.print("Consumers : " + t.getConsumerCount()); + context.print("Producers : " + t.getProducerCount()); + context.print("Enqueued : " + t.getEnqueueCount()); + context.print("Dequeued : " + t.getDequeueCount()); + context.print("In-flight : " + t.getInFlightCount()); + context.print("Memory usage : " + t.getMemoryPercentUsage() + "%"); + } + + @SuppressWarnings("unchecked") + private BrokerViewMBean getBrokerMBean() throws Exception { + List brokers = JmxMBeansUtil.getAllBrokers(createJmxConnection()); + if (brokers.isEmpty()) { + throw new Exception("No broker found in JMX context."); + } + ObjectName brokerName = brokers.get(0).getObjectName(); + return MBeanServerInvocationHandler.newProxyInstance( + createJmxConnection(), brokerName, BrokerViewMBean.class, true); + } + + private void requireTopicName(List tokens, String action) throws Exception { + if (tokens.isEmpty()) { + throw new IllegalArgumentException("Topic name required for '" + action + "'."); + } + } + + private static String dashes(int count) { + StringBuilder sb = new StringBuilder(count); + for (int i = 0; i < count; i++) { + sb.append('-'); + } + return sb.toString(); + } + + @Override + protected void printHelp() { + context.printHelp(helpFile); + } +} diff --git a/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command b/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command index 7df27efab85..b4b5e638e68 100644 --- a/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command +++ b/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command @@ -29,3 +29,7 @@ org.apache.activemq.console.command.PurgeCommand org.apache.activemq.console.command.ProducerCommand org.apache.activemq.console.command.ConsumerCommand org.apache.activemq.console.command.StoreBackupCommand +org.apache.activemq.console.command.QueuesCommand +org.apache.activemq.console.command.TopicsCommand +org.apache.activemq.console.command.BrokerCommand +org.apache.activemq.console.command.NetworkConnectorsCommand diff --git a/activemq-console/src/test/java/org/apache/activemq/console/BrokerCommandTest.java b/activemq-console/src/test/java/org/apache/activemq/console/BrokerCommandTest.java new file mode 100644 index 00000000000..838fecd7c7e --- /dev/null +++ b/activemq-console/src/test/java/org/apache/activemq/console/BrokerCommandTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.util.LinkedList; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.console.command.BrokerCommand; +import org.apache.activemq.console.formatter.CommandShellOutputFormatter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class BrokerCommandTest { + + private static final String CONNECTOR_NAME = "test-tcp"; + + private BrokerService brokerService; + + @Before + public void createBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.setPersistent(false); + brokerService.addConnector("tcp://0.0.0.0:0").setName(CONNECTOR_NAME); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + // --- info --- + + @Test(timeout = 30000) + public void testBrokerInfoShowsName() throws Exception { + String result = execute("info"); + assertTrue("broker name present", result.contains("localhost")); + assertTrue("version field present", result.contains("Version")); + assertTrue("uptime field present", result.contains("Uptime")); + assertTrue("memory usage field present", result.contains("Memory usage")); + assertTrue("store usage field present", result.contains("Store usage")); + assertTrue("queues count field present", result.contains("Queues")); + assertTrue("topics count field present", result.contains("Topics")); + } + + @Test(timeout = 30000) + public void testBrokerInfoShowsConnectionCounts() throws Exception { + String result = execute("info"); + assertTrue("connections field present", result.contains("Connections")); + assertTrue("producers field present", result.contains("Producers")); + assertTrue("consumers field present", result.contains("Consumers")); + } + + // --- connectors --- + + @Test(timeout = 30000) + public void testListConnectorsShowsConfiguredConnector() throws Exception { + String result = execute("connectors"); + assertTrue("connector name present", result.contains(CONNECTOR_NAME)); + assertTrue("URI column present", result.contains("URI")); + } + + @Test(timeout = 30000) + public void testListConnectorsExplicitSubAction() throws Exception { + String result = execute("connectors", "list"); + assertTrue("connector name present", result.contains(CONNECTOR_NAME)); + } + + @Test(timeout = 30000) + public void testConnectorsUnknownSubActionShowsHelp() throws Exception { + String result = execute("connectors", "badsubaction"); + assertTrue("unknown sub-action message", result.contains("Unknown connectors sub-action")); + } + + // --- scheduler --- + + @Test(timeout = 30000) + public void testSchedulerDisabledMessage() throws Exception { + // Default broker has no scheduler — verify the "not enabled" message + String result = execute("scheduler"); + assertTrue("scheduler not enabled message", result.contains("not enabled")); + } + + @Test(timeout = 30000) + public void testSchedulerEnabledShowsCounts() throws Exception { + // Restart broker with scheduler support enabled + brokerService.stop(); + brokerService = new BrokerService(); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.setPersistent(false); + brokerService.setSchedulerSupport(true); + brokerService.start(); + brokerService.waitUntilStarted(); + + String result = execute("scheduler"); + assertTrue("scheduled messages count present", result.contains("Scheduled messages")); + assertTrue("delayed messages count present", result.contains("Delayed messages")); + assertTrue("next scheduled at present", result.contains("Next scheduled at")); + } + + // --- error handling --- + + @Test(timeout = 30000) + public void testUnknownActionShowsHelp() throws Exception { + String result = execute("badaction"); + assertTrue("unknown action message", result.contains("Unknown action")); + } + + @Test(timeout = 30000) + public void testNoActionShowsHelp() throws Exception { + String result = execute(); + assertTrue("help content shown", result.contains("Actions:")); + } + + // --- helpers --- + + private String execute(String... args) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(1024); + CommandContext context = new CommandContext(); + context.setFormatter(new CommandShellOutputFormatter(out)); + + BrokerCommand cmd = new BrokerCommand(); + cmd.setJmxUseLocal(true); + cmd.setCommandContext(context); + + LinkedList tokens = new LinkedList<>(); + for (String arg : args) { + tokens.add(arg); + } + cmd.execute(tokens); + return out.toString(); + } +} diff --git a/activemq-console/src/test/java/org/apache/activemq/console/NetworkConnectorsCommandTest.java b/activemq-console/src/test/java/org/apache/activemq/console/NetworkConnectorsCommandTest.java new file mode 100644 index 00000000000..9beda531db5 --- /dev/null +++ b/activemq-console/src/test/java/org/apache/activemq/console/NetworkConnectorsCommandTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console; + +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.util.LinkedList; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.console.command.NetworkConnectorsCommand; +import org.apache.activemq.console.formatter.CommandShellOutputFormatter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class NetworkConnectorsCommandTest { + + private static final String NC_NAME = "test-nc"; + + private BrokerService brokerService; + + @Before + public void createBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.setPersistent(false); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + // --- list --- + + @Test(timeout = 30000) + public void testListWithNoNetworkConnectors() throws Exception { + String result = execute("list"); + assertTrue("empty message shown", result.contains("No network connectors configured.")); + } + + @Test(timeout = 30000) + public void testListShowsConfiguredConnector() throws Exception { + // Add a network connector directly to the broker before testing + DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(); + nc.setName(NC_NAME); + // Use a discovery URI that won't fail immediately even without a remote broker + nc.setUri(new java.net.URI("static:(failover:(tcp://localhost:0))")); + brokerService.addNetworkConnector(nc); + brokerService.registerNetworkConnectorMBean(nc); + + String result = execute("list"); + assertTrue("connector name in output", result.contains(NC_NAME)); + assertTrue("Auto-Start column present", result.contains("Auto-Start")); + assertTrue("Duplex column present", result.contains("Duplex")); + } + + // --- error handling --- + + @Test(timeout = 30000) + public void testUnknownActionShowsHelp() throws Exception { + String result = execute("badaction"); + assertTrue("unknown action message", result.contains("Unknown action")); + } + + @Test(timeout = 30000) + public void testNoActionShowsHelp() throws Exception { + String result = execute(); + assertTrue("help content shown", result.contains("Actions:")); + } + + @Test(timeout = 30000) + public void testAddMissingUriThrows() throws Exception { + try { + execute("add"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("URI required")); + } + } + + @Test(timeout = 30000) + public void testRemoveMissingNameThrows() throws Exception { + try { + execute("remove"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("name required")); + } + } + + // --- helpers --- + + private String execute(String... args) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(1024); + CommandContext context = new CommandContext(); + context.setFormatter(new CommandShellOutputFormatter(out)); + + NetworkConnectorsCommand cmd = new NetworkConnectorsCommand(); + cmd.setJmxUseLocal(true); + cmd.setCommandContext(context); + + LinkedList tokens = new LinkedList<>(); + for (String arg : args) { + tokens.add(arg); + } + cmd.execute(tokens); + return out.toString(); + } +} diff --git a/activemq-console/src/test/java/org/apache/activemq/console/QueuesCommandTest.java b/activemq-console/src/test/java/org/apache/activemq/console/QueuesCommandTest.java new file mode 100644 index 00000000000..281fd2c5012 --- /dev/null +++ b/activemq-console/src/test/java/org/apache/activemq/console/QueuesCommandTest.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.util.LinkedList; + +import jakarta.jms.Connection; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.console.command.QueuesCommand; +import org.apache.activemq.console.formatter.CommandShellOutputFormatter; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class QueuesCommandTest { + + @Rule + public TestName name = new TestName(); + + private BrokerService brokerService; + private ActiveMQConnectionFactory connectionFactory; + + @Before + public void createBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.setPersistent(false); + brokerService.setDestinations(new ActiveMQDestination[]{ + new ActiveMQQueue("Q1"), + new ActiveMQQueue("Q2") + }); + TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + brokerService.waitUntilStarted(); + connectionFactory = new ActiveMQConnectionFactory(connector.getPublishableConnectString()); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + // --- list --- + + @Test(timeout = 30000) + public void testListShowsAllQueues() throws Exception { + String result = execute("list"); + assertTrue("Q1 in output", result.contains("Q1")); + assertTrue("Q2 in output", result.contains("Q2")); + assertTrue("column header present", result.contains("Messages")); + assertTrue("column header present", result.contains("Consumers")); + } + + // --- create --- + + @Test(timeout = 30000) + public void testCreateQueue() throws Exception { + String result = execute("create", "NEW.QUEUE"); + assertTrue("confirmation message", result.contains("Queue created: NEW.QUEUE")); + + // Verify via JMX proxy that the queue now exists + QueueViewMBean proxy = getQueueProxy("NEW.QUEUE"); + assertEquals(0, proxy.getQueueSize()); + } + + // --- delete --- + + @Test(timeout = 30000) + public void testDeleteQueue() throws Exception { + // Q1 was created at startup; delete it + String result = execute("delete", "Q1"); + assertTrue("confirmation message", result.contains("Queue deleted: Q1")); + + // Verify Q1 no longer appears in list + String listResult = execute("list"); + assertFalse("Q1 gone from list", listResult.contains("Q1")); + assertTrue("Q2 still present", listResult.contains("Q2")); + } + + // --- purge --- + + @Test(timeout = 30000) + public void testPurgeQueue() throws Exception { + sendMessages("Q1", 5); + + QueueViewMBean proxy = getQueueProxy("Q1"); + assertEquals("5 messages before purge", 5, proxy.getQueueSize()); + + String result = execute("purge", "Q1"); + assertTrue("confirmation message", result.contains("Queue purged: Q1")); + assertEquals("0 messages after purge", 0, proxy.getQueueSize()); + } + + // --- info --- + + @Test(timeout = 30000) + public void testInfoQueue() throws Exception { + sendMessages("Q1", 3); + String result = execute("info", "Q1"); + + assertTrue("name field", result.contains("Q1")); + assertTrue("Messages field", result.contains("Messages")); + assertTrue("Consumers field", result.contains("Consumers")); + assertTrue("Producers field", result.contains("Producers")); + assertTrue("Memory usage field", result.contains("Memory usage")); + assertTrue("Paused field", result.contains("Paused")); + } + + @Test(timeout = 30000) + public void testInfoQueueNotFound() throws Exception { + String result = execute("info", "DOES.NOT.EXIST"); + assertTrue("not found message", result.contains("not found")); + } + + // --- browse --- + + @Test(timeout = 30000) + public void testBrowseQueue() throws Exception { + sendTextMessages("Q1", "hello-browse-msg", 2); + + String result = execute("browse", "Q1"); + assertTrue("browse header present", result.contains("Browsing queue")); + assertTrue("message count in header", result.contains("2 message(s)")); + assertTrue("JMSMessageID field present", result.contains("JMSMessageID")); + assertTrue("message body present", result.contains("hello-browse-msg")); + } + + @Test(timeout = 30000) + public void testBrowseEmptyQueue() throws Exception { + String result = execute("browse", "Q1"); + assertTrue("empty queue message", result.contains("No messages in queue")); + } + + // --- produce --- + + @Test(timeout = 30000) + public void testProduceMessage() throws Exception { + QueueViewMBean proxy = getQueueProxy("Q1"); + assertEquals("queue empty before produce", 0, proxy.getQueueSize()); + + String result = execute("produce", "Q1", "test-message-body"); + assertTrue("confirmation with message ID", result.contains("Message sent to Q1")); + assertTrue("message ID in output", result.contains("ID:")); + + assertEquals("queue has 1 message after produce", 1, proxy.getQueueSize()); + } + + @Test(timeout = 30000) + public void testProduceMissingBody() throws Exception { + try { + execute("produce", "Q1"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("body required")); + } + } + + // --- pause / resume --- + + @Test(timeout = 30000) + public void testPauseQueue() throws Exception { + QueueViewMBean proxy = getQueueProxy("Q1"); + assertFalse("queue not paused initially", proxy.isPaused()); + + String result = execute("pause", "Q1"); + assertTrue("confirmation message", result.contains("Queue paused: Q1")); + assertTrue("queue is now paused", proxy.isPaused()); + } + + @Test(timeout = 30000) + public void testResumeQueue() throws Exception { + // Pause first + execute("pause", "Q1"); + QueueViewMBean proxy = getQueueProxy("Q1"); + assertTrue("queue is paused", proxy.isPaused()); + + String result = execute("resume", "Q1"); + assertTrue("confirmation message", result.contains("Queue resumed: Q1")); + assertFalse("queue is no longer paused", proxy.isPaused()); + } + + // --- error handling --- + + @Test(timeout = 30000) + public void testUnknownActionShowsHelp() throws Exception { + String result = execute("badaction"); + assertTrue("unknown action message", result.contains("Unknown action")); + } + + @Test(timeout = 30000) + public void testNoActionShowsHelp() throws Exception { + String result = execute(); + assertTrue("help content shown", result.contains("Actions:")); + } + + // --- helpers --- + + private String execute(String... args) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(1024); + CommandContext context = new CommandContext(); + context.setFormatter(new CommandShellOutputFormatter(out)); + + QueuesCommand cmd = new QueuesCommand(); + cmd.setJmxUseLocal(true); + cmd.setCommandContext(context); + + LinkedList tokens = new LinkedList<>(); + for (String arg : args) { + tokens.add(arg); + } + cmd.execute(tokens); + return out.toString(); + } + + private QueueViewMBean getQueueProxy(String queueName) throws Exception { + ObjectName objectName = new ObjectName( + "org.apache.activemq:type=Broker,brokerName=localhost" + + ",destinationType=Queue,destinationName=" + queueName); + return (QueueViewMBean) brokerService.getManagementContext() + .newProxyInstance(objectName, QueueViewMBean.class, true); + } + + private void sendMessages(String queueName, int count) throws Exception { + Connection conn = connectionFactory.createConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + jakarta.jms.Queue dest = session.createQueue(queueName); + MessageProducer producer = session.createProducer(dest); + for (int i = 0; i < count; i++) { + producer.send(session.createMessage()); + } + conn.close(); + } + + private void sendTextMessages(String queueName, String body, int count) throws Exception { + Connection conn = connectionFactory.createConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + jakarta.jms.Queue dest = session.createQueue(queueName); + MessageProducer producer = session.createProducer(dest); + for (int i = 0; i < count; i++) { + TextMessage msg = session.createTextMessage(body); + producer.send(msg); + } + conn.close(); + } +} diff --git a/activemq-console/src/test/java/org/apache/activemq/console/TopicsCommandTest.java b/activemq-console/src/test/java/org/apache/activemq/console/TopicsCommandTest.java new file mode 100644 index 00000000000..e36a9c98f61 --- /dev/null +++ b/activemq-console/src/test/java/org/apache/activemq/console/TopicsCommandTest.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.util.LinkedList; + +import javax.management.ObjectName; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.TopicViewMBean; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.console.command.TopicsCommand; +import org.apache.activemq.console.formatter.CommandShellOutputFormatter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TopicsCommandTest { + + private BrokerService brokerService; + + @Before + public void createBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.getManagementContext().setCreateConnector(false); + brokerService.setPersistent(false); + brokerService.setDestinations(new ActiveMQDestination[]{ + new ActiveMQTopic("T1"), + new ActiveMQTopic("T2") + }); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + // --- list --- + + @Test(timeout = 30000) + public void testListShowsAllTopics() throws Exception { + String result = execute("list"); + assertTrue("T1 in output", result.contains("T1")); + assertTrue("T2 in output", result.contains("T2")); + assertTrue("column header present", result.contains("Messages")); + assertTrue("column header present", result.contains("Consumers")); + } + + @Test(timeout = 30000) + public void testListDoesNotShowQueues() throws Exception { + String result = execute("list"); + // Queues should never appear in topic list output + assertFalse("queue header not shown", result.contains("Queue Size")); + } + + // --- create --- + + @Test(timeout = 30000) + public void testCreateTopic() throws Exception { + String result = execute("create", "NEW.TOPIC"); + assertTrue("confirmation message", result.contains("Topic created: NEW.TOPIC")); + + // Verify it now appears in list + String listResult = execute("list"); + assertTrue("new topic in list", listResult.contains("NEW.TOPIC")); + } + + // --- delete --- + + @Test(timeout = 30000) + public void testDeleteTopic() throws Exception { + String result = execute("delete", "T1"); + assertTrue("confirmation message", result.contains("Topic deleted: T1")); + + // T1 should no longer appear in list + String listResult = execute("list"); + assertFalse("T1 gone from list", listResult.contains("T1")); + assertTrue("T2 still present", listResult.contains("T2")); + } + + // --- info --- + + @Test(timeout = 30000) + public void testInfoTopic() throws Exception { + String result = execute("info", "T1"); + assertTrue("name field", result.contains("T1")); + assertTrue("Messages field", result.contains("Messages")); + assertTrue("Consumers field", result.contains("Consumers")); + assertTrue("Producers field", result.contains("Producers")); + assertTrue("Memory usage field", result.contains("Memory usage")); + } + + @Test(timeout = 30000) + public void testInfoTopicNotFound() throws Exception { + String result = execute("info", "DOES.NOT.EXIST"); + assertTrue("not found message", result.contains("not found")); + } + + // --- error handling --- + + @Test(timeout = 30000) + public void testUnknownActionShowsHelp() throws Exception { + String result = execute("badaction"); + assertTrue("unknown action message", result.contains("Unknown action")); + } + + @Test(timeout = 30000) + public void testNoActionShowsHelp() throws Exception { + String result = execute(); + assertTrue("help content shown", result.contains("Actions:")); + } + + // --- helpers --- + + private String execute(String... args) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(1024); + CommandContext context = new CommandContext(); + context.setFormatter(new CommandShellOutputFormatter(out)); + + TopicsCommand cmd = new TopicsCommand(); + cmd.setJmxUseLocal(true); + cmd.setCommandContext(context); + + LinkedList tokens = new LinkedList<>(); + for (String arg : args) { + tokens.add(arg); + } + cmd.execute(tokens); + return out.toString(); + } + + private TopicViewMBean getTopicProxy(String topicName) throws Exception { + ObjectName objectName = new ObjectName( + "org.apache.activemq:type=Broker,brokerName=localhost" + + ",destinationType=Topic,destinationName=" + topicName); + return (TopicViewMBean) brokerService.getManagementContext() + .newProxyInstance(objectName, TopicViewMBean.class, true); + } +}