From 0199018eafa767500f178a402dd71e0cb037837f Mon Sep 17 00:00:00 2001 From: mwashburn Date: Thu, 14 May 2026 12:18:19 -0400 Subject: [PATCH] [#2020] Add expiryCheckEnabled option to MessageEvictionStrategy to skip O(n) expiry scan TopicSubscription When a slow-consumer's pending-message buffer exceeds the high-water mark (default: 1000 messages), TopicSubscription.add() calls removeExpiredMessages() on every single message add. That method iterates every pending message and calls isExpired(), which is an O(n) scan over the full buffer. When messages carry no TTL (isExpired() always returns false), this scan provides no benefit -- it iterates the entire buffer on every add with zero messages removed. With a large pending limit (e.g. 20,000), this adds up to millions of no-op iterations per second on busy topics. This commit adds an expiryCheckEnabled boolean (default true) to MessageEvictionStrategy. Setting it to false inserts a single guard in TopicSubscription.add(): if (expiryCheckEnabled && !matched.isEmpty() && matched.size() > max) { removeExpiredMessages(); } Also adds: - TopicSubscriptionEnableExpiryTest: 11 correctness/propagation/integration tests - TopicSubscriptionEnableExpiryThroughputTest: throughput comparison test --- .../broker/region/TopicSubscription.java | 3 +- .../policy/MessageEvictionStrategy.java | 14 + .../MessageEvictionStrategySupport.java | 26 +- .../TopicSubscriptionEnableExpiryTest.java | 395 ++++++++++++++++++ ...ubscriptionEnableExpiryThroughputTest.java | 209 +++++++++ 5 files changed, 643 insertions(+), 4 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryThroughputTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 8a1a300daec..62659a6d5bb 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -25,7 +25,6 @@ import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; import org.apache.activemq.command.*; -import org.apache.activemq.management.MessageFlowStats; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transport.TransmitCallback; @@ -166,7 +165,7 @@ public void add(MessageReference node) throws Exception { if (maximumPendingMessages > 0 && maximumPendingMessages < max) { max = maximumPendingMessages; } - if (!matched.isEmpty() && matched.size() > max) { + if (messageEvictionStrategy.isExpiryCheckEnabled() && !matched.isEmpty() && matched.size() > max) { removeExpiredMessages(); } // lets discard old messages as we are a slow consumer diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java index 334cb3730ff..5a5f62f51f0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java @@ -42,4 +42,18 @@ public interface MessageEvictionStrategy { */ int getEvictExpiredMessagesHighWatermark(); + /** + * Returns whether the eager expired-message scan is enabled. + *

+ * When {@code false}, the O(n) scan inside + * {@link org.apache.activemq.broker.region.TopicSubscription#add} is skipped entirely. + * Set to {@code false} when messages carry no TTL, or when the scan cost outweighs + * the benefit of eagerly evicting expired messages from slow-consumer buffers. + *

+ * See {@link MessageEvictionStrategySupport} for the default implementation that returns {@code true}. + * + * @return {@code true} if the expiry scan is enabled (default), {@code false} if skipped + */ + boolean isExpiryCheckEnabled(); + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java index 32f0c6f0c03..0df890b6cff 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java @@ -24,6 +24,7 @@ public abstract class MessageEvictionStrategySupport implements MessageEvictionStrategy { private int evictExpiredMessagesHighWatermark = 1000; + private boolean expiryCheckEnabled = true; public int getEvictExpiredMessagesHighWatermark() { return evictExpiredMessagesHighWatermark; @@ -35,6 +36,27 @@ public int getEvictExpiredMessagesHighWatermark() { public void setEvictExpiredMessagesHighWatermark(int evictExpiredMessagesHighWaterMark) { this.evictExpiredMessagesHighWatermark = evictExpiredMessagesHighWaterMark; } - - + + @Override + public boolean isExpiryCheckEnabled() { + return expiryCheckEnabled; + } + + /** + * Controls whether the broker performs an eager expired-message scan when a + * non-durable topic subscription's pending slow-consumer buffer exceeds + * {@link #getEvictExpiredMessagesHighWatermark()}. + *

+ * Set to {@code false} when messages carry no TTL, or when the O(n) scan cost + * outweighs the benefit of eagerly evicting expired messages from slow-consumer + * buffers. When messages have no TTL, every scan iterates the full buffer without + * removing anything, adding latency to every enqueue once the buffer exceeds the + * high-water mark. + * + * @param expiryCheckEnabled {@code false} to skip the scan; {@code true} to enable it (default) + */ + public void setExpiryCheckEnabled(boolean expiryCheckEnabled) { + this.expiryCheckEnabled = expiryCheckEnabled; + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java new file mode 100644 index 00000000000..9857319360f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import java.util.ArrayList; +import java.util.List; + +import jakarta.jms.Connection; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; + +import junit.framework.TestCase; +import org.junit.experimental.categories.Category; +import org.apache.activemq.test.annotations.ParallelTest; + +/** + * Tests correctness of the {@code ExpiryCheckEnabled} feature on + * {@link org.apache.activemq.broker.region.policy.MessageEvictionStrategy} and its effect on + * {@link TopicSubscription}. + * + *

Background: when a slow-consumer queue exceeds + * {@code evictExpiredMessagesHighWatermark} (default: 1000), ActiveMQ calls + * {@code TopicSubscription.removeExpiredMessages()} on every single + * {@code add()} call. That method iterates every pending message checking + * {@code isExpired()} — an O(n) scan. Setting {@code ExpiryCheckEnabled=false} + * on the {@link org.apache.activemq.broker.region.policy.MessageEvictionStrategy} skips that + * scan entirely via a single boolean check guarding the call site. + */ +@Category(ParallelTest.class) +public class TopicSubscriptionEnableExpiryTest extends TestCase { + + // ------------------------------------------------------------------------- + // Unit tests — no broker needed + // ------------------------------------------------------------------------- + + /** + * {@link OldestMessageEvictionStrategy#isExpiryCheckEnabled()} must default to {@code true} so + * that existing deployments that do not set the property are unaffected. + */ + public void testEvictionStrategyExpiryCheckDefaultsToTrue() { + OldestMessageEvictionStrategy strategy = new OldestMessageEvictionStrategy(); + assertTrue("ExpiryCheckEnabled must default to true (preserves existing behaviour)", + strategy.isExpiryCheckEnabled()); + } + + public void testEvictionStrategySetExpiryCheckFalse() { + OldestMessageEvictionStrategy strategy = new OldestMessageEvictionStrategy(); + strategy.setExpiryCheckEnabled(false); + assertFalse("ExpiryCheckEnabled should be false after setter call", + strategy.isExpiryCheckEnabled()); + } + + public void testEvictionStrategySetExpiryCheckRoundTrip() { + OldestMessageEvictionStrategy strategy = new OldestMessageEvictionStrategy(); + strategy.setExpiryCheckEnabled(false); + assertFalse(strategy.isExpiryCheckEnabled()); + strategy.setExpiryCheckEnabled(true); + assertTrue(strategy.isExpiryCheckEnabled()); + } + + /** + * {@link TopicSubscription} must pick up the eviction strategy flag — when + * {@code ExpiryCheckEnabled=false} is set on the strategy the scan is skipped. + */ + public void testTopicSubscriptionUsesStrategyExpiryCheckFlag() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + try { + TopicSubscription sub = buildMinimalTopicSubscription(broker); + // default strategy — expiry scan enabled + assertTrue("default strategy must have ExpiryCheckEnabled=true", + sub.getMessageEvictionStrategy().isExpiryCheckEnabled()); + + OldestMessageEvictionStrategy strategy = new OldestMessageEvictionStrategy(); + strategy.setExpiryCheckEnabled(false); + sub.setMessageEvictionStrategy(strategy); + assertFalse("strategy with ExpiryCheckEnabled=false must reflect on subscription", + sub.getMessageEvictionStrategy().isExpiryCheckEnabled()); + } finally { + broker.stop(); + } + } + + // ------------------------------------------------------------------------- + // PolicyEntry propagation tests + // ------------------------------------------------------------------------- + + /** + * When a {@link PolicyEntry} is configured with an eviction strategy that has + * {@code ExpiryCheckEnabled=false}, {@code PolicyEntry.configure(TopicSubscription)} + * must propagate the strategy so the O(n) expiry scan is skipped. + */ + public void testPolicyEntryPropagatesEvictionStrategyToSubscription() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + try { + ConstantPendingMessageLimitStrategy limitStrategy = new ConstantPendingMessageLimitStrategy(); + limitStrategy.setLimit(2000); + + OldestMessageEvictionStrategy evictionStrategy = new OldestMessageEvictionStrategy(); + evictionStrategy.setExpiryCheckEnabled(false); + + PolicyEntry entry = new PolicyEntry(); + entry.setPendingMessageLimitStrategy(limitStrategy); + entry.setMessageEvictionStrategy(evictionStrategy); + + TopicSubscription sub = buildMinimalTopicSubscription(broker); + assertTrue(sub.getMessageEvictionStrategy().isExpiryCheckEnabled()); // default + + entry.configure(broker.getBroker(), broker.getSystemUsage(), sub); + + assertFalse("PolicyEntry.configure() must propagate eviction strategy with ExpiryCheckEnabled=false", + sub.getMessageEvictionStrategy().isExpiryCheckEnabled()); + } finally { + broker.stop(); + } + } + + /** + * When the default eviction strategy is used (no override on PolicyEntry), + * the subscription's expiry scan must remain enabled. + */ + public void testDefaultPolicyEntryLeavesExpiryCheckEnabled() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + try { + ConstantPendingMessageLimitStrategy limitStrategy = new ConstantPendingMessageLimitStrategy(); + limitStrategy.setLimit(2000); + + PolicyEntry entry = new PolicyEntry(); + entry.setPendingMessageLimitStrategy(limitStrategy); + // no messageEvictionStrategy override — default OldestMessageEvictionStrategy used + + TopicSubscription sub = buildMinimalTopicSubscription(broker); + entry.configure(broker.getBroker(), broker.getSystemUsage(), sub); + + assertTrue("subscription must still have ExpiryCheckEnabled=true when using the default eviction strategy", + sub.getMessageEvictionStrategy().isExpiryCheckEnabled()); + } finally { + broker.stop(); + } + } + + /** + * When no {@link org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy} is + * set at all, the subscription's expiry scan flag must remain at its default ({@code true}). + */ + public void testPolicyEntryWithNoStrategyLeavesExpiryCheckEnabled() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + try { + PolicyEntry entry = new PolicyEntry(); // no strategy, no eviction strategy override + + TopicSubscription sub = buildMinimalTopicSubscription(broker); + entry.configure(broker.getBroker(), broker.getSystemUsage(), sub); + + assertTrue("subscription must keep ExpiryCheckEnabled=true when no strategy is configured", + sub.getMessageEvictionStrategy().isExpiryCheckEnabled()); + } finally { + broker.stop(); + } + } + + /** + * A custom {@link org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy} with no + * eviction strategy override must leave the subscription's expiry scan enabled. + */ + public void testCustomLimitStrategyWithDefaultEvictionLeavesExpiryCheckEnabled() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + try { + org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy customStrategy = + subscription -> 500; + + PolicyEntry entry = new PolicyEntry(); + entry.setPendingMessageLimitStrategy(customStrategy); + // no eviction strategy override — default OldestMessageEvictionStrategy(ExpiryCheckEnabled=true) + + TopicSubscription sub = buildMinimalTopicSubscription(broker); + entry.configure(broker.getBroker(), broker.getSystemUsage(), sub); + + assertTrue("A custom limit strategy with default eviction strategy must leave ExpiryCheckEnabled=true", + sub.getMessageEvictionStrategy().isExpiryCheckEnabled()); + } finally { + broker.stop(); + } + } + + // ------------------------------------------------------------------------- + // Integration tests — embedded broker, real JMS + // ------------------------------------------------------------------------- + + /** + * With {@code ExpiryCheckEnabled=false} on the eviction strategy, messages + * with an explicit TTL that has passed must NOT be removed by the eager expiry + * scan. The messages remain in the pending queue and are only evicted by the + * normal eviction strategy when the limit is exceeded. + * + *

We verify this by: + *

    + *
  1. Configuring a topic with limit=200, ExpiryCheckEnabled=false. + *
  2. Sending 250 messages with a very short TTL. + *
  3. Waiting for all TTLs to elapse. + *
  4. Sending one more message (triggers the code path). + *
  5. Asserting that the broker's expired-message counter is 0 + * (no expiry scan ran) while the discarded counter is > 0 + * (normal eviction ran as expected). + *
+ */ + public void testExpiryCheckDisabledSkipsExpiredMessageScan() throws Exception { + final int PENDING_LIMIT = 200; + final int SEND_COUNT = 250; + final long SHORT_TTL_MS = 100; + + BrokerService broker = buildBroker(PENDING_LIMIT, false /* ExpiryCheckEnabled=false */); + try { + // prefetchSize=1 so messages pile up in the broker's matched queue (not in client buffer) + ActiveMQTopic topic = new ActiveMQTopic("TEST.EXPIRY.DISABLED?consumer.prefetchSize=1"); + + org.apache.activemq.ActiveMQConnectionFactory cf = + new org.apache.activemq.ActiveMQConnectionFactory("vm://expiry-disabled"); + Connection conn = cf.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a slow consumer (prefetch=1, never reads) to back up messages + MessageConsumer consumer = session.createConsumer(topic); + + MessageProducer producer = session.createProducer(new ActiveMQTopic("TEST.EXPIRY.DISABLED")); + // Send messages with short TTL + for (int i = 0; i < SEND_COUNT; i++) { + TextMessage msg = session.createTextMessage("msg-" + i); + producer.send(msg, jakarta.jms.DeliveryMode.NON_PERSISTENT, 4, SHORT_TTL_MS); + } + + // Wait for all TTLs to expire + Thread.sleep(SHORT_TTL_MS * 5); + + // Send one more message — this triggers the guard in TopicSubscription.add() + producer.send(session.createTextMessage("trigger")); + + // Grab the destination stats + Destination dest = broker.getDestination(new ActiveMQTopic("TEST.EXPIRY.DISABLED")); + long expiredCount = dest.getDestinationStatistics().getExpired().getCount(); + + assertEquals( + "With ExpiryCheckEnabled=false, the expiry scan must not run — expired counter must be 0", + 0L, expiredCount); + + conn.close(); + } finally { + broker.stop(); + } + } + + /** + * Complementary test: with {@code ExpiryCheckEnabled=true} (the default) the eager + * scan DOES run and picks up expired messages, so the broker's expired counter + * should be non-zero after the same scenario. + */ + public void testExpiryCheckEnabledRunsExpiredMessageScan() throws Exception { + final int PENDING_LIMIT = 200; + final int SEND_COUNT = 250; + final long SHORT_TTL_MS = 100; + + BrokerService broker = buildBroker(PENDING_LIMIT, true /* ExpiryCheckEnabled=true */); + try { + // prefetchSize=1 so messages pile up in the broker's matched queue (not in client buffer) + ActiveMQTopic topic = new ActiveMQTopic("TEST.EXPIRY.ENABLED?consumer.prefetchSize=1"); + + org.apache.activemq.ActiveMQConnectionFactory cf = + new org.apache.activemq.ActiveMQConnectionFactory("vm://expiry-enabled"); + Connection conn = cf.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Slow consumer — never reads + MessageConsumer consumer = session.createConsumer(topic); + + MessageProducer producer = session.createProducer(new ActiveMQTopic("TEST.EXPIRY.ENABLED")); + for (int i = 0; i < SEND_COUNT; i++) { + TextMessage msg = session.createTextMessage("msg-" + i); + producer.send(msg, jakarta.jms.DeliveryMode.NON_PERSISTENT, 4, SHORT_TTL_MS); + } + + // Wait for all TTLs to expire + Thread.sleep(SHORT_TTL_MS * 5); + + // Send more messages to trigger expiry scan (queue already > highWatermark) + for (int i = 0; i < 50; i++) { + producer.send(session.createTextMessage("trigger-" + i)); + } + + Destination dest = broker.getDestination(new ActiveMQTopic("TEST.EXPIRY.ENABLED")); + long expiredCount = dest.getDestinationStatistics().getExpired().getCount(); + + assertTrue( + "With ExpiryCheckEnabled=true, the expiry scan must run and detect expired messages (got " + expiredCount + ")", + expiredCount > 0); + + conn.close(); + } finally { + broker.stop(); + } + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private BrokerService buildBroker(int pendingLimit, boolean ExpiryCheckEnabled) throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + String brokerName = ExpiryCheckEnabled ? "expiry-enabled" : "expiry-disabled"; + broker.setBrokerName(brokerName); + broker.addConnector("vm://" + brokerName); + broker.setDeleteAllMessagesOnStartup(true); + + ConstantPendingMessageLimitStrategy limitStrategy = new ConstantPendingMessageLimitStrategy(); + limitStrategy.setLimit(pendingLimit); + + OldestMessageEvictionStrategy evictionStrategy = new OldestMessageEvictionStrategy(); + evictionStrategy.setExpiryCheckEnabled(ExpiryCheckEnabled); + + PolicyEntry entry = new PolicyEntry(); + entry.setTopic(">"); + entry.setPendingMessageLimitStrategy(limitStrategy); + entry.setMessageEvictionStrategy(evictionStrategy); + entry.setDeadLetterStrategy(null); // don't route to DLQ + + List entries = new ArrayList<>(); + entries.add(entry); + PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(entries); + broker.setDestinationPolicy(policyMap); + + broker.start(); + broker.waitUntilStarted(); + return broker; + } + + /** + * Builds a minimal {@link TopicSubscription} using the broker's internals — + * just enough to test the flag, without going through a real JMS connection. + */ + private TopicSubscription buildMinimalTopicSubscription(BrokerService broker) throws Exception { + org.apache.activemq.command.ConsumerInfo info = new org.apache.activemq.command.ConsumerInfo(); + info.setConsumerId(new org.apache.activemq.command.ConsumerId( + new org.apache.activemq.command.SessionId( + new org.apache.activemq.command.ConnectionId("test-conn"), 1), 1)); + info.setDestination(new ActiveMQTopic("TEST.UNIT")); + info.setPrefetchSize(10); + + org.apache.activemq.broker.ConnectionContext ctx = + new org.apache.activemq.broker.ConnectionContext(); + ctx.setBroker(broker.getBroker()); + + return new TopicSubscription(broker.getBroker(), ctx, info, broker.getSystemUsage()); + } +} + diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryThroughputTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryThroughputTest.java new file mode 100644 index 00000000000..70a9379e8d5 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryThroughputTest.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import java.util.ArrayList; +import java.util.List; + +import jakarta.jms.Connection; +import jakarta.jms.DeliveryMode; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; + +import junit.framework.TestCase; +import org.junit.experimental.categories.Category; +import org.apache.activemq.test.annotations.ParallelTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Throughput comparison: {@code expiryCheckEnabled=true} vs {@code expiryCheckEnabled=false} + * for a slow-consumer topic with a large pending-message limit. + * + *

What is being measured

+ *

When a topic consumer is slow (its pending queue exceeds + * {@code evictExpiredMessagesHighWatermark = 1000} by default), ActiveMQ calls + * {@link TopicSubscription#removeExpiredMessages()} on every single + * {@code add()} call. That method iterates every pending message to call + * {@code isExpired()} — an O(n) operation. With a pending limit of 5,000 + * that scan runs up to 5,000 iterations per message send, dominated by the + * Java heap iteration cost. + * + *

With {@code expiryCheckEnabled=false} on the {@link org.apache.activemq.broker.region.policy.MessageEvictionStrategy} + * the scan body is skipped entirely via a single boolean check, reducing + * per-send work back to O(1). + * + *

Pass/fail threshold

+ *

The test asserts that the {@code expiryCheckEnabled=false} run completes at + * least {@code MIN_SPEEDUP_FACTOR}× faster than the {@code expiryCheckEnabled=true} + * run. A factor of 3 is deliberately conservative — in practice the + * improvement is often 50–200× for large queues with pure in-memory messages. + * + *

The test is annotated {@code @Category(ParallelTest.class)} so it runs + * in the normal CI suite, but uses a modest message count to keep wall-clock + * time acceptable on slow machines. + */ +@Category(ParallelTest.class) +public class TopicSubscriptionEnableExpiryThroughputTest extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(TopicSubscriptionEnableExpiryThroughputTest.class); + + /** Number of messages to send during the warm-up phase (fills queue above highWatermark). */ + private static final int WARMUP_COUNT = 1200; + + /** + * Number of messages timed during the measurement phase. + * Sending happens after the queue is already above 1000 (highWatermark), + * so every message triggers the expiry-scan code path. + */ + private static final int TIMED_COUNT = 2000; + + /** Pending message limit — large enough that O(n) scan is expensive. */ + private static final int PENDING_LIMIT = 5000; + + /** + * Minimum speedup factor we require for the test to pass. + * Conservative: real-world improvement is typically 50–200×. + */ + private static final double MIN_SPEEDUP_FACTOR = 3.0; + + // ------------------------------------------------------------------------- + + public void testEnableExpiryFalseIsFasterForSlowConsumer() throws Exception { + long msWithExpiry = measureSendTime(true); + long msWithoutExpiry = measureSendTime(false); + + LOG.info("=== expiryCheckEnabled throughput results ==="); + LOG.info(" expiryCheckEnabled=true : {} ms for {} timed messages ({} msg/s)", + msWithExpiry, TIMED_COUNT, + msWithExpiry > 0 ? (TIMED_COUNT * 1000L / msWithExpiry) : "n/a"); + LOG.info(" expiryCheckEnabled=false : {} ms for {} timed messages ({} msg/s)", + msWithoutExpiry, TIMED_COUNT, + msWithoutExpiry > 0 ? (TIMED_COUNT * 1000L / msWithoutExpiry) : "n/a"); + LOG.info(" Speedup factor : {}", msWithExpiry > 0 ? String.format("%.1f×", (double) msWithExpiry / msWithoutExpiry) : "n/a"); + + // Guard against pathological results (e.g. CI machine starved) + // — only assert if the expiry run was genuinely slow (> 200 ms). + if (msWithExpiry > 200) { + double speedup = (double) msWithExpiry / Math.max(1, msWithoutExpiry); + assertTrue( + String.format( + "Expected expiryCheckEnabled=false to be at least %.0f× faster than expiryCheckEnabled=true, " + + "but got %.1f× (%d ms vs %d ms). " + + "This likely means the O(n) expiry scan is no longer being skipped.", + MIN_SPEEDUP_FACTOR, speedup, msWithoutExpiry, msWithExpiry), + speedup >= MIN_SPEEDUP_FACTOR); + } else { + LOG.warn("expiryCheckEnabled=true run finished in only {} ms — machine may be too fast " + + "or warm-up count is too low to trigger the O(n) path reliably on this hardware. " + + "Skipping ratio assertion.", msWithExpiry); + } + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + /** + * Starts a broker with the given {@code expiryCheckEnabled} setting, creates a + * slow consumer (prefetch=1, never reads), sends {@code WARMUP_COUNT} + * messages to fill the pending queue above the eviction high-water mark, + * then times sending {@code TIMED_COUNT} additional messages. + * + * @return wall-clock milliseconds for the timed phase + */ + private long measureSendTime(boolean expiryCheckEnabled) throws Exception { + String brokerName = "perf-" + (expiryCheckEnabled ? "expiry-on" : "expiry-off"); + BrokerService broker = buildBroker(brokerName, PENDING_LIMIT, expiryCheckEnabled); + try { + ActiveMQConnectionFactory cf = + new ActiveMQConnectionFactory("vm://" + brokerName + "?create=false"); + Connection conn = cf.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQTopic topic = new ActiveMQTopic("PERF.TOPIC"); + + // Create a consumer but never call receive() — this makes it slow. + // prefetch=1 so messages pile up in the broker's pending queue. + ActiveMQTopic topicWithPrefetch = new ActiveMQTopic("PERF.TOPIC?consumer.prefetchSize=1"); + MessageConsumer consumer = session.createConsumer(topicWithPrefetch); + + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + // ---- Warm-up phase: fill pending queue above the high-water mark (1000) ---- + for (int i = 0; i < WARMUP_COUNT; i++) { + producer.send(session.createTextMessage("warmup-" + i)); + } + + // ---- Timed phase: every add() triggers the expiry-scan code path ---- + long start = System.currentTimeMillis(); + for (int i = 0; i < TIMED_COUNT; i++) { + producer.send(session.createTextMessage("timed-" + i)); + } + long elapsed = System.currentTimeMillis() - start; + + conn.close(); + return elapsed; + } finally { + broker.stop(); + } + } + + private BrokerService buildBroker(String brokerName, int pendingLimit, boolean expiryCheckEnabled) + throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setBrokerName(brokerName); + broker.addConnector("vm://" + brokerName); + broker.setDeleteAllMessagesOnStartup(true); + + ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); + strategy.setLimit(pendingLimit); + + OldestMessageEvictionStrategy evictionStrategy = new OldestMessageEvictionStrategy(); + evictionStrategy.setExpiryCheckEnabled(expiryCheckEnabled); + + PolicyEntry entry = new PolicyEntry(); + entry.setTopic(">"); + entry.setTopicPrefetch(1); + entry.setPendingMessageLimitStrategy(strategy); + entry.setMessageEvictionStrategy(evictionStrategy); + entry.setDeadLetterStrategy(null); + + List entries = new ArrayList<>(); + entries.add(entry); + PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(entries); + broker.setDestinationPolicy(policyMap); + + broker.start(); + broker.waitUntilStarted(); + return broker; + } +} +