Skip to content

Commit a8fa4b0

Browse files
authored
[AMQ-9860] [AMQ-9861] connection async exception handling and activemq pool tests connection leaks to other tests (#1657)
* [AMQ-9861] activemq-pool tests have resource leaks causing test interference activemq-pool tests use default broker name causing BrokerRegistry collisions Missing waitUntilStopped() after broker.stop() in tests * [AMQ-9860] Refactor async exception handling to improve resource management and prevent task execution on closed connections * Increase timeout for exception event propagation in ConnectionFailureEvictsFromPoolTest * Refactor PooledConnectionFactory to remove AutoCloseable implementation and improve resource management in tests
1 parent 7795308 commit a8fa4b0

File tree

7 files changed

+176
-132
lines changed

7 files changed

+176
-132
lines changed

activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java

Lines changed: 47 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.CopyOnWriteArrayList;
3030
import java.util.concurrent.CountDownLatch;
3131
import java.util.concurrent.LinkedBlockingQueue;
32+
import java.util.concurrent.RejectedExecutionException;
3233
import java.util.concurrent.RejectedExecutionHandler;
3334
import java.util.concurrent.ThreadFactory;
3435
import java.util.concurrent.ThreadPoolExecutor;
@@ -1949,12 +1950,7 @@ public Response processBrokerInfo(BrokerInfo info) throws Exception {
19491950

19501951
@Override
19511952
public Response processConnectionError(final ConnectionError error) throws Exception {
1952-
executor.execute(new Runnable() {
1953-
@Override
1954-
public void run() {
1955-
onAsyncException(error.getException());
1956-
}
1957-
});
1953+
executeAsync(() -> onAsyncException(error.getException()));
19581954
return null;
19591955
}
19601956

@@ -2018,18 +2014,12 @@ protected void onWireFormatInfo(WireFormatInfo info) {
20182014
* @param error the exception that the problem
20192015
*/
20202016
public void onClientInternalException(final Throwable error) {
2021-
if ( !closed.get() && !closing.get() ) {
2022-
if ( this.clientInternalExceptionListener != null ) {
2023-
executor.execute(new Runnable() {
2024-
@Override
2025-
public void run() {
2026-
ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
2027-
}
2028-
});
2029-
} else {
2030-
LOG.debug("Async client internal exception occurred with no exception listener registered: {}",
2031-
error, error);
2017+
if (this.clientInternalExceptionListener != null) {
2018+
if (!executeAsync(() -> clientInternalExceptionListener.onException(error))) {
2019+
LOG.debug("Async client internal exception occurred but connection is closing: {}", error, error);
20322020
}
2021+
} else {
2022+
LOG.debug("Async client internal exception occurred with no exception listener registered: {}", error, error);
20332023
}
20342024
}
20352025

@@ -2045,14 +2035,8 @@ public void onAsyncException(Throwable error) {
20452035
if (!(error instanceof JMSException)) {
20462036
error = JMSExceptionSupport.create(error);
20472037
}
2048-
final JMSException e = (JMSException)error;
2049-
2050-
executor.execute(new Runnable() {
2051-
@Override
2052-
public void run() {
2053-
ActiveMQConnection.this.exceptionListener.onException(e);
2054-
}
2055-
});
2038+
final JMSException e = (JMSException) error;
2039+
executeAsync(() -> exceptionListener.onException(e));
20562040

20572041
} else {
20582042
LOG.debug("Async exception with no exception listener: {}", error, error);
@@ -2063,25 +2047,19 @@ public void run() {
20632047
@Override
20642048
public void onException(final IOException error) {
20652049
onAsyncException(error);
2066-
if (!closed.get() && !closing.get()) {
2067-
executor.execute(new Runnable() {
2068-
@Override
2069-
public void run() {
2070-
transportFailed(error);
2071-
ServiceSupport.dispose(ActiveMQConnection.this.transport);
2072-
brokerInfoReceived.countDown();
2073-
try {
2074-
doCleanup(true);
2075-
} catch (JMSException e) {
2076-
LOG.warn("Exception during connection cleanup, " + e, e);
2077-
}
2078-
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2079-
TransportListener listener = iter.next();
2080-
listener.onException(error);
2081-
}
2082-
}
2083-
});
2084-
}
2050+
executeAsync(() -> {
2051+
transportFailed(error);
2052+
ServiceSupport.dispose(ActiveMQConnection.this.transport);
2053+
brokerInfoReceived.countDown();
2054+
try {
2055+
doCleanup(true);
2056+
} catch (JMSException e) {
2057+
LOG.warn("Exception during connection cleanup, " + e, e);
2058+
}
2059+
for (final TransportListener listener : transportListeners) {
2060+
listener.onException(error);
2061+
}
2062+
});
20852063
}
20862064

20872065
@Override
@@ -2490,6 +2468,31 @@ protected ThreadPoolExecutor getExecutor() {
24902468
return this.executor;
24912469
}
24922470

2471+
/**
2472+
* Safely executes a task on the connection's executor, handling the case where
2473+
* the executor may be shutdown due to connection closure. See #close() above.
2474+
* <p>
2475+
* We need to check if the connection is closed/closing and if the executor
2476+
* is shutdown before attempting to execute anything. We also need to catch
2477+
* {@link RejectedExecutionException} to handle check and call senario.
2478+
*
2479+
* @param task the task to execute
2480+
* @return true if the task was submitted successfully, false if the executor
2481+
* was unavailable (connection closing or executor shutdown)
2482+
*/
2483+
private boolean executeAsync(final Runnable task) {
2484+
if (closed.get() || closing.get() || executor.isShutdown()) {
2485+
return false;
2486+
}
2487+
try {
2488+
executor.execute(task);
2489+
return true;
2490+
2491+
} catch (final RejectedExecutionException e) {
2492+
return false; // connection already closing probably
2493+
}
2494+
}
2495+
24932496
protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
24942497
return sessions;
24952498
}

activemq-pool/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@
126126

127127
<build>
128128
<plugins>
129+
<plugin>
130+
<artifactId>maven-surefire-plugin</artifactId>
131+
<configuration>
132+
<reuseForks>false</reuseForks>
133+
</configuration>
134+
</plugin>
129135
<plugin>
130136
<groupId>org.apache.felix</groupId>
131137
<artifactId>maven-bundle-plugin</artifactId>

activemq-pool/src/test/java/org/apache/activemq/pool/ConfigFromPropsTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@
2424

2525
public class ConfigFromPropsTest {
2626

27-
XaPooledConnectionFactory underTest;
28-
2927
@Test
3028
public void testBrokerUrlForRarAdminObject() throws Exception {
31-
underTest = new XaPooledConnectionFactory();
32-
underTest.setBrokerUrl("vm://localhost?broker.persistent=false");
33-
Connection connection = underTest.createConnection();
34-
assertNotNull(connection);
35-
connection.close();
36-
assertNotNull(underTest.getBrokerUrl());
29+
final XaPooledConnectionFactory underTest = new XaPooledConnectionFactory();
30+
try {
31+
underTest.setBrokerUrl("vm://configFromPropsTest?broker.persistent=false");
32+
final Connection connection = underTest.createConnection();
33+
assertNotNull(connection);
34+
connection.close();
35+
assertNotNull(underTest.getBrokerUrl());
36+
} finally {
37+
underTest.stop();
38+
}
3739
}
3840
}

activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java

Lines changed: 50 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -49,71 +49,81 @@ public class ConnectionFailureEvictsFromPoolTest extends TestSupport {
4949

5050
protected void setUp() throws Exception {
5151
broker = new BrokerService();
52+
broker.setBrokerName("connectionFailureEvictsFromPoolTest");
5253
broker.setUseJmx(false);
5354
broker.setPersistent(false);
5455
connector = broker.addConnector("tcp://localhost:0");
5556
broker.start();
5657
}
5758

5859
public void testEnhancedConnection() throws Exception {
59-
XaPooledConnectionFactory pooledFactory =
60+
final XaPooledConnectionFactory pooledFactory =
6061
new XaPooledConnectionFactory(new ActiveMQXAConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false"));
61-
62-
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
63-
EnhancedConnection enhancedConnection = (EnhancedConnection)connection.getConnection();
64-
DestinationSource destinationSource = enhancedConnection.getDestinationSource();
65-
assertNotNull(destinationSource);
66-
62+
try {
63+
try (final PooledConnection connection = (PooledConnection) pooledFactory.createConnection()) {
64+
final EnhancedConnection enhancedConnection = (EnhancedConnection) connection.getConnection();
65+
final DestinationSource destinationSource = enhancedConnection.getDestinationSource();
66+
assertNotNull(destinationSource);
67+
}
68+
} finally {
69+
pooledFactory.stop();
70+
}
6771
}
6872

6973
public void testEvictionXA() throws Exception {
70-
XaPooledConnectionFactory pooledFactory =
74+
final XaPooledConnectionFactory pooledFactory =
7175
new XaPooledConnectionFactory(new ActiveMQXAConnectionFactory("mock:(" + connector.getConnectUri() + "?closeAsync=false)?jms.xaAckMode=1"));
72-
73-
doTestEviction(pooledFactory);
76+
try {
77+
doTestEviction(pooledFactory);
78+
} finally {
79+
pooledFactory.stop();
80+
}
7481
}
7582

7683
public void testEviction() throws Exception {
77-
PooledConnectionFactory pooledFactory =
84+
final PooledConnectionFactory pooledFactory =
7885
new PooledConnectionFactory(new ActiveMQConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false"));
79-
80-
doTestEviction(pooledFactory);
86+
try {
87+
doTestEviction(pooledFactory);
88+
} finally {
89+
pooledFactory.stop();
90+
}
8191
}
8292

8393
public void doTestEviction(ConnectionFactory pooledFactory) throws Exception {
84-
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
85-
ActiveMQConnection amqC = (ActiveMQConnection) connection.getConnection();
8694
final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
87-
amqC.addTransportListener(new TransportListener() {
88-
public void onCommand(Object command) {
89-
}
90-
public void onException(IOException error) {
91-
// we know connection is dead...
92-
// listeners are fired async
93-
gotExceptionEvent.countDown();
94-
}
95-
public void transportInterupted() {
96-
}
97-
public void transportResumed() {
98-
}
99-
});
95+
try (final PooledConnection connection = (PooledConnection) pooledFactory.createConnection()) {
96+
final ActiveMQConnection amqC = (ActiveMQConnection) connection.getConnection();
97+
amqC.addTransportListener(new TransportListener() {
98+
public void onCommand(Object command) {
99+
}
100+
public void onException(IOException error) {
101+
// we know connection is dead...
102+
// listeners are fired async
103+
gotExceptionEvent.countDown();
104+
}
105+
public void transportInterupted() {
106+
}
107+
public void transportResumed() {
108+
}
109+
});
100110

101-
sendMessage(connection);
102-
LOG.info("sent one message worked fine");
103-
createConnectionFailure(connection);
104-
try {
105111
sendMessage(connection);
106-
TestCase.fail("Expected Error");
107-
} catch (JMSException e) {
108-
} finally {
109-
connection.close();
112+
LOG.info("sent one message worked fine");
113+
createConnectionFailure(connection);
114+
try {
115+
sendMessage(connection);
116+
TestCase.fail("Expected Error");
117+
} catch (JMSException e) {
118+
}
110119
}
111-
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(5, TimeUnit.SECONDS));
120+
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(15, TimeUnit.SECONDS));
112121
// If we get another connection now it should be a new connection that
113122
// works.
114123
LOG.info("expect new connection after failure");
115-
Connection connection2 = pooledFactory.createConnection();
116-
sendMessage(connection2);
124+
try (final Connection connection2 = pooledFactory.createConnection()) {
125+
sendMessage(connection2);
126+
}
117127
}
118128

119129
private void createConnectionFailure(Connection connection) throws Exception {
@@ -132,5 +142,6 @@ private void sendMessage(Connection connection) throws JMSException {
132142

133143
protected void tearDown() throws Exception {
134144
broker.stop();
145+
broker.waitUntilStopped();
135146
}
136147
}

activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,22 @@
2929
*/
3030
public class PooledConnectionFactoryTest {
3131

32-
private final Logger LOG = LoggerFactory.getLogger(PooledConnectionFactoryTest.class);
32+
private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionFactoryTest.class);
3333

3434
@Test(timeout=240000)
3535
public void testGetReference() throws Exception {
36-
PooledConnectionFactory factory = createPooledConnectionFactory();
37-
Reference ref = factory.getReference();
38-
assertNotNull(ref);
36+
final PooledConnectionFactory factory = createPooledConnectionFactory();
37+
try {
38+
final Reference ref = factory.getReference();
39+
assertNotNull(ref);
40+
} finally {
41+
factory.stop();
42+
}
3943
}
4044

4145
protected PooledConnectionFactory createPooledConnectionFactory() {
42-
PooledConnectionFactory cf = new PooledConnectionFactory(
43-
"vm://localhost?broker.persistent=false");
46+
final PooledConnectionFactory cf = new PooledConnectionFactory(
47+
"vm://pooledConnectionFactoryTest?broker.persistent=false");
4448
LOG.debug("ConnectionFactory initialized.");
4549
return cf;
4650
}

activemq-pool/src/test/java/org/apache/activemq/pool/PooledConsumerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,14 @@ public class PooledConsumerTest {
6262

6363
class PooledConsumer implements MessageListener {
6464

65-
private ConnectionFactory factory;
65+
private final PooledConnectionFactory factory;
6666
private Connection connection;
6767
public boolean done = false;
6868

6969
public PooledConsumer(String url) throws JMSException {
70-
org.apache.activemq.pool.PooledConnectionFactory factory = new org.apache.activemq.pool.PooledConnectionFactory(url);
70+
factory = new PooledConnectionFactory(url);
7171
factory.setMaxConnections(5);
7272
factory.setIdleTimeout(0);
73-
this.factory = factory;
7473
init();
7574
}
7675

@@ -140,11 +139,13 @@ public void close() {
140139
public void done() {
141140
done = true;
142141
close();
142+
factory.stop();
143143
}
144144
}
145145

146146
public void startBroker(String group, String trasport) throws Exception {
147147
brokerService = new BrokerService();
148+
brokerService.setBrokerName("pooledConsumerTest");
148149
brokerService.addConnector(trasport);
149150
brokerService.setPersistent(false);
150151
brokerService.setUseJmx(false);

0 commit comments

Comments
 (0)