Skip to content

Commit c82284c

Browse files
committed
CAMEL-10873: camel-sjms transacted routes dead-lock when exceptions are thrown by asynchronous processors. Thanks to Daniele Fognini for test case.
1 parent cc704cd commit c82284c

File tree

3 files changed

+131
-21
lines changed

3 files changed

+131
-21
lines changed

components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,15 @@ public void onMessage(Message message) {
7272

7373
log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
7474

75-
if (isTransacted() && synchronization != null) {
76-
exchange.addOnCompletion(synchronization);
77-
}
7875
try {
7976
if (isTransacted() || isSynchronous()) {
8077
log.debug("Handling synchronous message: {}", exchange.getIn().getBody());
8178
handleMessage(exchange);
79+
if (exchange.isFailed()) {
80+
synchronization.onFailure(exchange);
81+
} else {
82+
synchronization.onComplete(exchange);
83+
}
8284
} else {
8385
log.debug("Handling asynchronous message: {}", exchange.getIn().getBody());
8486
executor.execute(new Runnable() {

components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@
2020

2121
import org.apache.camel.Exchange;
2222
import org.apache.camel.component.sjms.TransactionCommitStrategy;
23-
import org.apache.camel.spi.Synchronization;
23+
import org.apache.camel.support.SynchronizationAdapter;
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

2727
/**
28-
* SessionTransactionSynchronization is called at the completion of each {@link org.apache.camel.Exhcnage}.
28+
* SessionTransactionSynchronization is called at the completion of each {@link org.apache.camel.Exchange}.
29+
* <p/>
30+
* The commit or rollback on the {@link Session} must be performed from the same thread that consumed the message.
2931
*/
30-
public class SessionTransactionSynchronization implements Synchronization {
31-
private Logger log = LoggerFactory.getLogger(getClass());
32-
private Session session;
32+
public class SessionTransactionSynchronization extends SynchronizationAdapter {
33+
private static final Logger LOG = LoggerFactory.getLogger(SessionTransactionSynchronization.class);
34+
35+
private final Session session;
3336
private final TransactionCommitStrategy commitStrategy;
3437

3538
public SessionTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy) {
@@ -41,42 +44,39 @@ public SessionTransactionSynchronization(Session session, TransactionCommitStrat
4144
}
4245
}
4346

44-
/**
45-
* @param exchange
46-
* @see org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange)
47-
*/
47+
4848
@Override
4949
public void onFailure(Exchange exchange) {
5050
try {
5151
if (commitStrategy.rollback(exchange)) {
52-
log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
52+
LOG.debug("Processing failure of Exchange id: {}", exchange.getExchangeId());
5353
if (session != null && session.getTransacted()) {
5454
session.rollback();
5555
}
5656
}
5757
} catch (Exception e) {
58-
log.warn("Failed to rollback the session: {}", e.getMessage());
58+
LOG.warn("Failed to rollback the session: {}", e.getMessage());
5959
}
6060
}
6161

62-
/**
63-
* @param exchange
64-
* @see org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange
65-
*)
66-
*/
6762
@Override
6863
public void onComplete(Exchange exchange) {
6964
try {
7065
if (commitStrategy.commit(exchange)) {
71-
log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
66+
LOG.debug("Processing completion of Exchange id: {}", exchange.getExchangeId());
7267
if (session != null && session.getTransacted()) {
7368
session.commit();
7469
}
7570
}
7671
} catch (Exception e) {
77-
log.warn("Failed to commit the session: {}", e.getMessage());
72+
LOG.warn("Failed to commit the session: {}", e.getMessage());
7873
exchange.setException(e);
7974
}
8075
}
8176

77+
@Override
78+
public boolean allowHandover() {
79+
// must not handover as we should be synchronous
80+
return false;
81+
}
8282
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.component.sjms.tx;
18+
19+
import java.lang.management.ManagementFactory;
20+
import java.lang.management.ThreadInfo;
21+
import java.lang.management.ThreadMXBean;
22+
import java.util.Arrays;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.stream.Collectors;
26+
27+
import org.apache.activemq.ActiveMQConnectionFactory;
28+
import org.apache.camel.CamelContext;
29+
import org.apache.camel.builder.RouteBuilder;
30+
import org.apache.camel.component.mock.MockEndpoint;
31+
import org.apache.camel.component.sjms.SjmsComponent;
32+
import org.apache.camel.test.junit4.CamelTestSupport;
33+
import org.junit.Test;
34+
35+
public class TransactedAsyncExceptionTest extends CamelTestSupport {
36+
37+
private static final String BROKER_URI = "vm://tqc_test_broker?broker.persistent=false&broker.useJmx=false";
38+
39+
private static final int TRANSACTION_REDELIVERY_COUNT = 10;
40+
41+
@Test
42+
public void testRouteWithThread() throws Exception {
43+
String destination = "sjms:queue:async.exception";
44+
45+
context.addRoutes(new RouteBuilder() {
46+
@Override
47+
public void configure() throws Exception {
48+
AtomicInteger counter = new AtomicInteger();
49+
50+
from(destination + "?acknowledgementMode=SESSION_TRANSACTED&transacted=true")
51+
.threads()
52+
.process(exchange -> {
53+
if (counter.incrementAndGet() < TRANSACTION_REDELIVERY_COUNT) {
54+
throw new IllegalArgumentException();
55+
}
56+
})
57+
.to("mock:async.exception");
58+
}
59+
});
60+
61+
template.sendBody(destination, "begin");
62+
63+
MockEndpoint mockEndpoint = context.getEndpoint("mock:async.exception", MockEndpoint.class);
64+
65+
mockEndpoint.expectedMessageCount(1);
66+
if (!mockEndpoint.await(getShutdownTimeout(), TimeUnit.SECONDS)) {
67+
dumpThreads();
68+
}
69+
assertMockEndpointsSatisfied(getShutdownTimeout(), TimeUnit.SECONDS);
70+
}
71+
72+
private void dumpThreads() {
73+
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
74+
for (ThreadInfo threadInfo : threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), Integer.MAX_VALUE)) {
75+
if (Thread.State.BLOCKED.equals(threadInfo.getThreadState())) {
76+
log.error("blocked thread: {}", threadInfo);
77+
} else {
78+
log.info("normal thread: {}", threadInfo);
79+
}
80+
log.info("full stack: {}", Arrays.stream(threadInfo.getStackTrace()).map(Object::toString).collect(Collectors.joining("\n\t")));
81+
}
82+
}
83+
84+
@Override
85+
protected CamelContext createCamelContext() throws Exception {
86+
CamelContext camelContext = super.createCamelContext();
87+
88+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URI);
89+
90+
connectionFactory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
91+
connectionFactory.getRedeliveryPolicy().setRedeliveryDelay(0);
92+
connectionFactory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
93+
connectionFactory.getRedeliveryPolicy().setUseExponentialBackOff(false);
94+
connectionFactory.getRedeliveryPolicy().setMaximumRedeliveries(TRANSACTION_REDELIVERY_COUNT);
95+
96+
SjmsComponent component = new SjmsComponent();
97+
component.setConnectionFactory(connectionFactory);
98+
camelContext.addComponent("sjms", component);
99+
100+
return camelContext;
101+
}
102+
103+
@Override
104+
protected int getShutdownTimeout() {
105+
return 2;
106+
}
107+
108+
}

0 commit comments

Comments
 (0)