From fec354249fd81809452467d8a06cabc2488125fa Mon Sep 17 00:00:00 2001 From: Pradeep Kunchala Date: Thu, 12 Feb 2026 09:57:18 +0530 Subject: [PATCH 1/2] AMQ-9855: Defensive copy in VMTransport to align behavior with remote transports + unit test --- .../activemq/command/ActiveMQTextMessage.java | 31 +-- .../ActiveMQTextMessageStressTest.java | 187 ++++++++++++++++++ 2 files changed, 206 insertions(+), 12 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageStressTest.java diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java index f0a529af9d4..b45e44f8c34 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java @@ -55,8 +55,10 @@ public Message copy() { } private void copy(ActiveMQTextMessage copy) { - super.copy(copy); - copy.text = text; + synchronized(this) { + super.copy(copy); + copy.text = this.text; + } } @Override @@ -70,14 +72,14 @@ public String getJMSXMimeType() { } @Override - public void setText(String text) throws MessageNotWriteableException { + public synchronized void setText(String text) throws MessageNotWriteableException { checkReadOnlyBody(); this.text = text; setContent(null); } @Override - public String getText() throws JMSException { + public synchronized String getText() throws JMSException { ByteSequence content = getContent(); if (text == null && content != null) { @@ -116,19 +118,19 @@ private String decodeContent(ByteSequence bodyAsBytes) throws JMSException { } @Override - public void beforeMarshall(WireFormat wireFormat) throws IOException { + public synchronized void beforeMarshall(WireFormat wireFormat) throws IOException { super.beforeMarshall(wireFormat); storeContentAndClear(); } @Override - public void storeContentAndClear() { + public synchronized void storeContentAndClear() { storeContent(); text=null; } @Override - public void storeContent() { + public synchronized void storeContent() { try { ByteSequence content = getContent(); String text = this.text; @@ -153,13 +155,18 @@ public void storeContent() { // see https://issues.apache.org/activemq/browse/AMQ-2103 // and https://issues.apache.org/activemq/browse/AMQ-2966 @Override - public void clearUnMarshalledState() throws JMSException { + public synchronized void clearUnMarshalledState() throws JMSException { + // Crucial: Store the content before we wipe the text + // This ensures we don't end up with BOTH being null + if (this.text != null && getContent() == null) { + storeContent(); + } super.clearUnMarshalledState(); this.text = null; } @Override - public boolean isContentMarshalled() { + public synchronized boolean isContentMarshalled() { return content != null || text == null; } @@ -175,13 +182,13 @@ public boolean isContentMarshalled() { * due to some internal error. */ @Override - public void clearBody() throws JMSException { + public synchronized void clearBody() throws JMSException { super.clearBody(); this.text = null; } @Override - public int getSize() { + public synchronized int getSize() { String text = this.text; if (size == 0 && content == null && text != null) { size = getMinimumMessageSize(); @@ -194,7 +201,7 @@ public int getSize() { } @Override - public String toString() { + public synchronized String toString() { try { String text = this.text; if( text == null ) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageStressTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageStressTest.java new file mode 100644 index 00000000000..496f01cfcc6 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageStressTest.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.command; + +import jakarta.jms.Connection; +import jakarta.jms.MessageConsumer; +import jakarta.jms.Session; +import jakarta.jms.Topic; +import jakarta.jms.MessageProducer; +import jakarta.jms.TextMessage; +import jakarta.jms.JMSException; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertEquals; + +public class ActiveMQTextMessageStressTest { + + private static final Logger LOG = LoggerFactory.getLogger(ActiveMQTextMessageStressTest.class); + private BrokerService broker; + private Connection connection; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("vm://localhost"); + broker.start(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + connection = cf.createConnection(); + connection.setClientID("HIGH_CONC_TEST"); + connection.start(); + } + + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + if (broker != null) { + broker.stop(); + } + } + + @Test + public void testConcurrentProducersAndConsumers() throws Exception { + final int MESSAGE_COUNT = 50; + final int PRODUCERS = 2; + final int DURABLE_CONSUMERS = 2; + final int NON_DURABLE_CONSUMERS = 2; + final int TOTAL_CONSUMERS = DURABLE_CONSUMERS + NON_DURABLE_CONSUMERS; + + Session tmpSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = tmpSession.createTopic("HIGH_CONC.TOPIC"); + + List consumers = new ArrayList<>(); + List consumerSessions = new ArrayList<>(); + + for (int i = 1; i <= DURABLE_CONSUMERS; i++) { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumers.add(s.createDurableSubscriber(topic, "Durable-" + i)); + consumerSessions.add(s); + } + for (int i = 1; i <= NON_DURABLE_CONSUMERS; i++) { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumers.add(s.createConsumer(topic)); + consumerSessions.add(s); + } + + ExecutorService executor = Executors.newFixedThreadPool(PRODUCERS + TOTAL_CONSUMERS); + CountDownLatch producerLatch = new CountDownLatch(PRODUCERS); + + // Producers + for (int p = 1; p <= PRODUCERS; p++) { + final int producerId = p; + executor.submit(() -> { + try { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = s.createProducer(topic); + for (int m = 1; m <= MESSAGE_COUNT; m++) { + TextMessage msg = s.createTextMessage("P" + producerId + "-M" + m); + producer.send(msg); + } + s.close(); + } catch (JMSException e) { + LOG.error("Producer error", e); + } finally { + producerLatch.countDown(); + } + }); + } + + // Consumers + List>> consumerFutures = new ArrayList<>(); + for (MessageConsumer consumer : consumers) { + consumerFutures.add(executor.submit(() -> { + List received = new ArrayList<>(); + try { + for (int i = 0; i < MESSAGE_COUNT * PRODUCERS; i++) { + TextMessage msg = (TextMessage) consumer.receive(10000); + assertNotNull("Consumer should receive a message", msg); + + // Hammer the message to trigger race condition on unmarshalling + for (int j = 0; j < 10; j++) { + String txt = msg.getText(); + assertNotNull("Text should never be null during stress", txt); + // Clear state to force unmarshalling on the next call + ((ActiveMQTextMessage) msg).clearUnMarshalledState(); + } + received.add(msg); + } + } catch (Exception e) { + LOG.error("Consumer error", e); + } + return received; + })); + } + + producerLatch.await(30, TimeUnit.SECONDS); + + List> allConsumed = new ArrayList<>(); + for (Future> f : consumerFutures) { + allConsumed.add(f.get(30, TimeUnit.SECONDS)); + } + + // Validate independent instances and data integrity + for (int i = 0; i < allConsumed.size(); i++) { + List consumerMsgs = allConsumed.get(i); + assertEquals("Consumer " + i + " did not receive all messages", MESSAGE_COUNT * PRODUCERS, consumerMsgs.size()); + + for (int j = i + 1; j < allConsumed.size(); j++) { + List otherMsgs = allConsumed.get(j); + + for (int k = 0; k < consumerMsgs.size(); k++) { + TextMessage m1 = consumerMsgs.get(k); + TextMessage m2 = otherMsgs.get(k); + + assertNotSame("Message wrappers MUST be different instances across consumers", m1, m2); + assertEquals("Content must match", m1.getText(), m2.getText()); + assertNotNull("Content should not be null", m1.getText()); + } + } + } + + executor.shutdown(); + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + + for (Session s : consumerSessions) { + s.close(); + } + tmpSession.close(); + } +} \ No newline at end of file From 4e41fec1a95994ff8d5b2fa009a7202cfa1c2097 Mon Sep 17 00:00:00 2001 From: Pradeep Kunchala Date: Tue, 17 Feb 2026 00:42:52 +0530 Subject: [PATCH 2/2] Fix: Ensure content is marshaled before clearing unmarshalled state in ActiveMQTextMessage --- .../activemq/command/ActiveMQTextMessage.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java index b45e44f8c34..7c1b7490a47 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java @@ -55,10 +55,8 @@ public Message copy() { } private void copy(ActiveMQTextMessage copy) { - synchronized(this) { super.copy(copy); copy.text = this.text; - } } @Override @@ -72,14 +70,14 @@ public String getJMSXMimeType() { } @Override - public synchronized void setText(String text) throws MessageNotWriteableException { + public void setText(String text) throws MessageNotWriteableException { checkReadOnlyBody(); this.text = text; setContent(null); } @Override - public synchronized String getText() throws JMSException { + public String getText() throws JMSException { ByteSequence content = getContent(); if (text == null && content != null) { @@ -118,19 +116,19 @@ private String decodeContent(ByteSequence bodyAsBytes) throws JMSException { } @Override - public synchronized void beforeMarshall(WireFormat wireFormat) throws IOException { + public void beforeMarshall(WireFormat wireFormat) throws IOException { super.beforeMarshall(wireFormat); storeContentAndClear(); } @Override - public synchronized void storeContentAndClear() { + public void storeContentAndClear() { storeContent(); text=null; } @Override - public synchronized void storeContent() { + public void storeContent() { try { ByteSequence content = getContent(); String text = this.text; @@ -155,7 +153,7 @@ public synchronized void storeContent() { // see https://issues.apache.org/activemq/browse/AMQ-2103 // and https://issues.apache.org/activemq/browse/AMQ-2966 @Override - public synchronized void clearUnMarshalledState() throws JMSException { + public void clearUnMarshalledState() throws JMSException { // Crucial: Store the content before we wipe the text // This ensures we don't end up with BOTH being null if (this.text != null && getContent() == null) { @@ -166,7 +164,7 @@ public synchronized void clearUnMarshalledState() throws JMSException { } @Override - public synchronized boolean isContentMarshalled() { + public boolean isContentMarshalled() { return content != null || text == null; }