Skip to content

Commit fec3542

Browse files
author
Pradeep Kunchala
committed
AMQ-9855: Defensive copy in VMTransport to align behavior with remote transports + unit test
1 parent 7d5291e commit fec3542

File tree

2 files changed

+206
-12
lines changed

2 files changed

+206
-12
lines changed

activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ public Message copy() {
5555
}
5656

5757
private void copy(ActiveMQTextMessage copy) {
58-
super.copy(copy);
59-
copy.text = text;
58+
synchronized(this) {
59+
super.copy(copy);
60+
copy.text = this.text;
61+
}
6062
}
6163

6264
@Override
@@ -70,14 +72,14 @@ public String getJMSXMimeType() {
7072
}
7173

7274
@Override
73-
public void setText(String text) throws MessageNotWriteableException {
75+
public synchronized void setText(String text) throws MessageNotWriteableException {
7476
checkReadOnlyBody();
7577
this.text = text;
7678
setContent(null);
7779
}
7880

7981
@Override
80-
public String getText() throws JMSException {
82+
public synchronized String getText() throws JMSException {
8183
ByteSequence content = getContent();
8284

8385
if (text == null && content != null) {
@@ -116,19 +118,19 @@ private String decodeContent(ByteSequence bodyAsBytes) throws JMSException {
116118
}
117119

118120
@Override
119-
public void beforeMarshall(WireFormat wireFormat) throws IOException {
121+
public synchronized void beforeMarshall(WireFormat wireFormat) throws IOException {
120122
super.beforeMarshall(wireFormat);
121123
storeContentAndClear();
122124
}
123125

124126
@Override
125-
public void storeContentAndClear() {
127+
public synchronized void storeContentAndClear() {
126128
storeContent();
127129
text=null;
128130
}
129131

130132
@Override
131-
public void storeContent() {
133+
public synchronized void storeContent() {
132134
try {
133135
ByteSequence content = getContent();
134136
String text = this.text;
@@ -153,13 +155,18 @@ public void storeContent() {
153155
// see https://issues.apache.org/activemq/browse/AMQ-2103
154156
// and https://issues.apache.org/activemq/browse/AMQ-2966
155157
@Override
156-
public void clearUnMarshalledState() throws JMSException {
158+
public synchronized void clearUnMarshalledState() throws JMSException {
159+
// Crucial: Store the content before we wipe the text
160+
// This ensures we don't end up with BOTH being null
161+
if (this.text != null && getContent() == null) {
162+
storeContent();
163+
}
157164
super.clearUnMarshalledState();
158165
this.text = null;
159166
}
160167

161168
@Override
162-
public boolean isContentMarshalled() {
169+
public synchronized boolean isContentMarshalled() {
163170
return content != null || text == null;
164171
}
165172

@@ -175,13 +182,13 @@ public boolean isContentMarshalled() {
175182
* due to some internal error.
176183
*/
177184
@Override
178-
public void clearBody() throws JMSException {
185+
public synchronized void clearBody() throws JMSException {
179186
super.clearBody();
180187
this.text = null;
181188
}
182189

183190
@Override
184-
public int getSize() {
191+
public synchronized int getSize() {
185192
String text = this.text;
186193
if (size == 0 && content == null && text != null) {
187194
size = getMinimumMessageSize();
@@ -194,7 +201,7 @@ public int getSize() {
194201
}
195202

196203
@Override
197-
public String toString() {
204+
public synchronized String toString() {
198205
try {
199206
String text = this.text;
200207
if( text == null ) {
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.command;
18+
19+
import jakarta.jms.Connection;
20+
import jakarta.jms.MessageConsumer;
21+
import jakarta.jms.Session;
22+
import jakarta.jms.Topic;
23+
import jakarta.jms.MessageProducer;
24+
import jakarta.jms.TextMessage;
25+
import jakarta.jms.JMSException;
26+
27+
import org.apache.activemq.ActiveMQConnectionFactory;
28+
import org.apache.activemq.broker.BrokerService;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import java.util.List;
36+
import java.util.ArrayList;
37+
import java.util.concurrent.ExecutorService;
38+
import java.util.concurrent.Executors;
39+
import java.util.concurrent.CountDownLatch;
40+
import java.util.concurrent.Future;
41+
import java.util.concurrent.TimeUnit;
42+
43+
import static org.junit.Assert.assertNotNull;
44+
import static org.junit.Assert.assertNotSame;
45+
import static org.junit.Assert.assertEquals;
46+
47+
public class ActiveMQTextMessageStressTest {
48+
49+
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQTextMessageStressTest.class);
50+
private BrokerService broker;
51+
private Connection connection;
52+
53+
@Before
54+
public void setUp() throws Exception {
55+
broker = new BrokerService();
56+
broker.setPersistent(false);
57+
broker.setUseJmx(false);
58+
broker.addConnector("vm://localhost");
59+
broker.start();
60+
61+
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
62+
connection = cf.createConnection();
63+
connection.setClientID("HIGH_CONC_TEST");
64+
connection.start();
65+
}
66+
67+
@After
68+
public void tearDown() throws Exception {
69+
if (connection != null) {
70+
connection.close();
71+
}
72+
if (broker != null) {
73+
broker.stop();
74+
}
75+
}
76+
77+
@Test
78+
public void testConcurrentProducersAndConsumers() throws Exception {
79+
final int MESSAGE_COUNT = 50;
80+
final int PRODUCERS = 2;
81+
final int DURABLE_CONSUMERS = 2;
82+
final int NON_DURABLE_CONSUMERS = 2;
83+
final int TOTAL_CONSUMERS = DURABLE_CONSUMERS + NON_DURABLE_CONSUMERS;
84+
85+
Session tmpSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
86+
Topic topic = tmpSession.createTopic("HIGH_CONC.TOPIC");
87+
88+
List<MessageConsumer> consumers = new ArrayList<>();
89+
List<Session> consumerSessions = new ArrayList<>();
90+
91+
for (int i = 1; i <= DURABLE_CONSUMERS; i++) {
92+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
93+
consumers.add(s.createDurableSubscriber(topic, "Durable-" + i));
94+
consumerSessions.add(s);
95+
}
96+
for (int i = 1; i <= NON_DURABLE_CONSUMERS; i++) {
97+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
98+
consumers.add(s.createConsumer(topic));
99+
consumerSessions.add(s);
100+
}
101+
102+
ExecutorService executor = Executors.newFixedThreadPool(PRODUCERS + TOTAL_CONSUMERS);
103+
CountDownLatch producerLatch = new CountDownLatch(PRODUCERS);
104+
105+
// Producers
106+
for (int p = 1; p <= PRODUCERS; p++) {
107+
final int producerId = p;
108+
executor.submit(() -> {
109+
try {
110+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
111+
MessageProducer producer = s.createProducer(topic);
112+
for (int m = 1; m <= MESSAGE_COUNT; m++) {
113+
TextMessage msg = s.createTextMessage("P" + producerId + "-M" + m);
114+
producer.send(msg);
115+
}
116+
s.close();
117+
} catch (JMSException e) {
118+
LOG.error("Producer error", e);
119+
} finally {
120+
producerLatch.countDown();
121+
}
122+
});
123+
}
124+
125+
// Consumers
126+
List<Future<List<TextMessage>>> consumerFutures = new ArrayList<>();
127+
for (MessageConsumer consumer : consumers) {
128+
consumerFutures.add(executor.submit(() -> {
129+
List<TextMessage> received = new ArrayList<>();
130+
try {
131+
for (int i = 0; i < MESSAGE_COUNT * PRODUCERS; i++) {
132+
TextMessage msg = (TextMessage) consumer.receive(10000);
133+
assertNotNull("Consumer should receive a message", msg);
134+
135+
// Hammer the message to trigger race condition on unmarshalling
136+
for (int j = 0; j < 10; j++) {
137+
String txt = msg.getText();
138+
assertNotNull("Text should never be null during stress", txt);
139+
// Clear state to force unmarshalling on the next call
140+
((ActiveMQTextMessage) msg).clearUnMarshalledState();
141+
}
142+
received.add(msg);
143+
}
144+
} catch (Exception e) {
145+
LOG.error("Consumer error", e);
146+
}
147+
return received;
148+
}));
149+
}
150+
151+
producerLatch.await(30, TimeUnit.SECONDS);
152+
153+
List<List<TextMessage>> allConsumed = new ArrayList<>();
154+
for (Future<List<TextMessage>> f : consumerFutures) {
155+
allConsumed.add(f.get(30, TimeUnit.SECONDS));
156+
}
157+
158+
// Validate independent instances and data integrity
159+
for (int i = 0; i < allConsumed.size(); i++) {
160+
List<TextMessage> consumerMsgs = allConsumed.get(i);
161+
assertEquals("Consumer " + i + " did not receive all messages", MESSAGE_COUNT * PRODUCERS, consumerMsgs.size());
162+
163+
for (int j = i + 1; j < allConsumed.size(); j++) {
164+
List<TextMessage> otherMsgs = allConsumed.get(j);
165+
166+
for (int k = 0; k < consumerMsgs.size(); k++) {
167+
TextMessage m1 = consumerMsgs.get(k);
168+
TextMessage m2 = otherMsgs.get(k);
169+
170+
assertNotSame("Message wrappers MUST be different instances across consumers", m1, m2);
171+
assertEquals("Content must match", m1.getText(), m2.getText());
172+
assertNotNull("Content should not be null", m1.getText());
173+
}
174+
}
175+
}
176+
177+
executor.shutdown();
178+
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
179+
executor.shutdownNow();
180+
}
181+
182+
for (Session s : consumerSessions) {
183+
s.close();
184+
}
185+
tmpSession.close();
186+
}
187+
}

0 commit comments

Comments
 (0)