Skip to content

Commit ba1b109

Browse files
author
Pradeep Kunchala
committed
AMQ-9855: Defensive copy in VMTransport to align behavior with remote transports + unit test
1 parent 7d5291e commit ba1b109

File tree

2 files changed

+191
-6
lines changed

2 files changed

+191
-6
lines changed

activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import java.util.concurrent.atomic.AtomicBoolean;
2727
import java.util.concurrent.atomic.AtomicLong;
2828

29+
import org.apache.activemq.command.MessageDispatch;
30+
import org.apache.activemq.command.ActiveMQMessage;
2931
import org.apache.activemq.command.ShutdownInfo;
32+
import org.apache.activemq.openwire.OpenWireFormat;
3033
import org.apache.activemq.thread.Task;
3134
import org.apache.activemq.thread.TaskRunner;
3235
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -35,6 +38,7 @@
3538
import org.apache.activemq.transport.Transport;
3639
import org.apache.activemq.transport.TransportDisposedIOException;
3740
import org.apache.activemq.transport.TransportListener;
41+
import org.apache.activemq.util.ByteSequence;
3842
import org.apache.activemq.util.IOExceptionSupport;
3943
import org.apache.activemq.wireformat.WireFormat;
4044
import org.slf4j.Logger;
@@ -80,10 +84,6 @@ public void setPeer(VMTransport peer) {
8084
@Override
8185
public void oneway(Object command) throws IOException {
8286

83-
if (disposed.get()) {
84-
throw new TransportDisposedIOException("Transport disposed.");
85-
}
86-
8787
if (peer == null) {
8888
throw new IOException("Peer not connected.");
8989
}
@@ -94,6 +94,29 @@ public void oneway(Object command) throws IOException {
9494
throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
9595
}
9696

97+
// Deep copy the message if it is a MessageDispatch
98+
Object toSend = command;
99+
if (command instanceof MessageDispatch) {
100+
MessageDispatch original = (MessageDispatch) command;
101+
try {
102+
WireFormat wf = new OpenWireFormat();
103+
ByteSequence data = wf.marshal(original);
104+
toSend = wf.unmarshal(data); // deep copy
105+
} catch (IOException e) {
106+
LOG.warn("Failed to deep copy MessageDispatch, sending original", e);
107+
toSend = command;
108+
}
109+
} else if (command instanceof ActiveMQMessage) {
110+
ActiveMQMessage original = (ActiveMQMessage) command;
111+
try {
112+
WireFormat wf = new OpenWireFormat();
113+
ByteSequence data = wf.marshal(original);
114+
toSend = (ActiveMQMessage) wf.unmarshal(data);
115+
} catch (IOException e) {
116+
throw new RuntimeException("Failed to deep copy MessageDispatch in VM transport", e);
117+
}
118+
}
119+
97120
if (peer.async) {
98121
peer.getMessageQueue().put(command);
99122
peer.wakeup();
@@ -124,14 +147,18 @@ public void oneway(Object command) throws IOException {
124147
return;
125148
}
126149
}
150+
151+
// Dispatch to listener
152+
dispatch(peer, peer.messageQueue, toSend);
153+
return;
154+
127155
} catch (InterruptedException e) {
128156
Thread.currentThread().interrupt();
129157
InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
130158
iioe.initCause(e);
131159
throw iioe;
132160
}
133161

134-
dispatch(peer, peer.messageQueue, command);
135162
}
136163

137164
public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
@@ -163,6 +190,7 @@ public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Objec
163190
public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
164191
transport.receiveCounter++;
165192
transportListener.onCommand(command);
193+
166194
}
167195

168196
@Override
@@ -255,7 +283,7 @@ protected void wakeup() {
255283
}
256284

257285
/**
258-
* @see org.apache.activemq.thread.Task#iterate()
286+
* @see Task#iterate()
259287
*/
260288
@Override
261289
public boolean iterate() {
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.transport.vm;
18+
19+
import jakarta.jms.Connection;
20+
import jakarta.jms.MessageConsumer;
21+
import jakarta.jms.Session;
22+
import jakarta.jms.Topic;
23+
import jakarta.jms.MessageProducer;
24+
import jakarta.jms.TextMessage;
25+
import jakarta.jms.JMSException;
26+
27+
import org.apache.activemq.ActiveMQConnectionFactory;
28+
import org.apache.activemq.broker.BrokerService;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
33+
import java.util.List;
34+
import java.util.ArrayList;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.Future;
39+
import java.util.concurrent.TimeUnit;
40+
41+
import static org.junit.Assert.assertNotNull;
42+
import static org.junit.Assert.assertNotSame;
43+
44+
45+
public class VMTransportDefensiveCopyTest {
46+
47+
private BrokerService broker;
48+
private Connection connection;
49+
50+
@Before
51+
public void setUp() throws Exception {
52+
broker = new BrokerService();
53+
broker.setPersistent(false);
54+
broker.addConnector("vm://localhost");
55+
broker.start();
56+
57+
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
58+
connection = cf.createConnection();
59+
connection.setClientID("HIGH_CONC_TEST"); // needed for durable subscribers
60+
connection.start();
61+
}
62+
63+
@After
64+
public void tearDown() throws Exception {
65+
if (connection != null) connection.close();
66+
if (broker != null) broker.stop();
67+
}
68+
69+
@Test
70+
public void testConcurrentProducersAndConsumers() throws Exception {
71+
final int MESSAGE_COUNT = 100;
72+
final int PRODUCERS = 5;
73+
final int DURABLE_CONSUMERS = 2;
74+
final int NON_DURABLE_CONSUMERS = 3;
75+
76+
// Topic
77+
Session tmpSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
78+
Topic topic = tmpSession.createTopic("HIGH_CONC.TOPIC");
79+
80+
// Consumers
81+
List<MessageConsumer> consumers = new ArrayList<>();
82+
List<Session> consumerSessions = new ArrayList<>();
83+
for (int i = 1; i <= DURABLE_CONSUMERS; i++) {
84+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
85+
consumers.add(s.createDurableSubscriber(topic, "Durable-" + i));
86+
consumerSessions.add(s);
87+
}
88+
for (int i = 1; i <= NON_DURABLE_CONSUMERS; i++) {
89+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
90+
consumers.add(s.createConsumer(topic));
91+
consumerSessions.add(s);
92+
}
93+
94+
ExecutorService executor = Executors.newFixedThreadPool(PRODUCERS + consumers.size());
95+
96+
// Produce messages concurrently
97+
CountDownLatch producerLatch = new CountDownLatch(PRODUCERS);
98+
for (int p = 1; p <= PRODUCERS; p++) {
99+
final int producerId = p;
100+
executor.submit(() -> {
101+
try {
102+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
103+
MessageProducer producer = s.createProducer(topic);
104+
for (int m = 1; m <= MESSAGE_COUNT; m++) {
105+
TextMessage msg = s.createTextMessage("P" + producerId + "-M" + m);
106+
producer.send(msg);
107+
}
108+
} catch (JMSException e) {
109+
e.printStackTrace();
110+
} finally {
111+
producerLatch.countDown();
112+
}
113+
});
114+
}
115+
116+
// Consume messages concurrently
117+
List<Future<List<String>>> consumerFutures = new ArrayList<>();
118+
for (MessageConsumer consumer : consumers) {
119+
consumerFutures.add(executor.submit(() -> {
120+
List<String> received = new ArrayList<>();
121+
for (int i = 0; i < MESSAGE_COUNT * PRODUCERS; i++) {
122+
TextMessage msg = (TextMessage) consumer.receive(5000);
123+
assertNotNull("Consumer should receive a message", msg);
124+
received.add(msg.getText());
125+
}
126+
return received;
127+
}));
128+
}
129+
130+
// Wait for producers to finish
131+
producerLatch.await();
132+
133+
// Collect and validate consumer messages
134+
List<List<String>> allConsumed = new ArrayList<>();
135+
for (Future<List<String>> f : consumerFutures) {
136+
allConsumed.add(f.get(30, TimeUnit.SECONDS));
137+
}
138+
139+
// Check that each consumer received unique message instances
140+
for (int i = 0; i < allConsumed.size(); i++) {
141+
List<String> consumerMsgs = allConsumed.get(i);
142+
for (int j = i + 1; j < allConsumed.size(); j++) {
143+
List<String> otherMsgs = allConsumed.get(j);
144+
for (int k = 0; k < consumerMsgs.size(); k++) {
145+
assertNotSame(
146+
"Message instances should be independent across consumers",
147+
consumerMsgs.get(k),
148+
otherMsgs.get(k)
149+
);
150+
}
151+
}
152+
}
153+
154+
executor.shutdownNow();
155+
for (Session s : consumerSessions) s.close();
156+
}
157+
}

0 commit comments

Comments
 (0)