Skip to content

Commit 750c8e4

Browse files
committed
CAMEL-12441: Fixed splitter in parallel and streaming mode may block if iterator next throws exception on first call. Thanks to Bodor Maria Mihaela for reporting and reproducer sample project.
1 parent 06032e7 commit 750c8e4

File tree

2 files changed

+115
-0
lines changed

2 files changed

+115
-0
lines changed

camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,10 @@ public Exchange call() throws Exception {
361361
} else {
362362
executionException.set(ObjectHelper.wrapRuntimeCamelException(e));
363363
}
364+
// and because of the exception we must signal we are done so the latch can open and let the other thread continue processing
365+
LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId());
366+
LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId());
367+
aggregationOnTheFlyDone.countDown();
364368
}
365369

366370
// signal all tasks has been submitted
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.issues;
18+
19+
import java.util.Iterator;
20+
import java.util.function.Consumer;
21+
22+
import org.apache.camel.ContextTestSupport;
23+
import org.apache.camel.builder.RouteBuilder;
24+
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
25+
26+
/**
27+
* Tests the issue stated in
28+
* <a href="https://issues.apache.org/jira/browse/CAMEL-12441">CAMEL-12441</a>.
29+
*/
30+
public class SplitterParallelWithIteratorThrowingExceptionTest extends ContextTestSupport {
31+
32+
public void testIteratorThrowExceptionOnFirst() throws Exception {
33+
getMockEndpoint("mock:line").expectedMessageCount(0);
34+
getMockEndpoint("mock:end").expectedMessageCount(0);
35+
36+
try {
37+
template.sendBody("direct:start", new MyIterator(1));
38+
fail("Should throw exception");
39+
} catch (Exception e) {
40+
IllegalArgumentException iae = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
41+
assertEquals("Forced error", iae.getMessage());
42+
}
43+
44+
assertMockEndpointsSatisfied();
45+
}
46+
47+
public void testIteratorThrowExceptionOnSecond() throws Exception {
48+
getMockEndpoint("mock:line").expectedMessageCount(1);
49+
getMockEndpoint("mock:end").expectedMessageCount(0);
50+
51+
try {
52+
template.sendBody("direct:start", new MyIterator(0));
53+
fail("Should throw exception");
54+
} catch (Exception e) {
55+
IllegalArgumentException iae = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
56+
assertEquals("Forced error", iae.getMessage());
57+
}
58+
59+
assertMockEndpointsSatisfied();
60+
}
61+
62+
@Override
63+
protected RouteBuilder createRouteBuilder() throws Exception {
64+
return new RouteBuilder() {
65+
@Override
66+
public void configure() throws Exception {
67+
from("direct:start").
68+
split(body())
69+
.aggregationStrategy(new UseLatestAggregationStrategy())
70+
.streaming().stopOnException().parallelProcessing().parallelAggregate()
71+
.to("mock:line")
72+
.end()
73+
.to("mock:end");
74+
}
75+
};
76+
}
77+
78+
public static class MyIterator implements Iterator<String> {
79+
80+
public MyIterator(int count) {
81+
this.count = count;
82+
}
83+
84+
private int count;
85+
86+
@Override
87+
public boolean hasNext() {
88+
return count < 2;
89+
}
90+
91+
@Override
92+
public String next() {
93+
count++;
94+
if (count == 1) {
95+
return "Hello";
96+
} else {
97+
throw new IllegalArgumentException("Forced error");
98+
}
99+
}
100+
101+
@Override
102+
public void remove() {
103+
// noop
104+
}
105+
106+
@Override
107+
public void forEachRemaining(Consumer<? super String> action) {
108+
// noop
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)