Skip to content

Commit 55c7c15

Browse files
authored
Merge pull request #45947 from ozangunalp/kafka3_3.9.0
2 parents 55355ad + 9124fb1 commit 55c7c15

File tree

4 files changed

+78
-9
lines changed

4 files changed

+78
-9
lines changed

bom/application/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@
138138
<jboss-logging.version>3.6.1.Final</jboss-logging.version>
139139
<mutiny.version>2.8.0</mutiny.version>
140140
<jctools-core.version>4.0.5</jctools-core.version>
141-
<kafka3.version>3.7.2</kafka3.version>
141+
<kafka3.version>3.9.0</kafka3.version>
142142
<lz4.version>1.8.0</lz4.version> <!-- dependency of the kafka-clients that could be overridden by other imported BOMs in the platform -->
143143
<snappy.version>1.1.10.5</snappy.version>
144144
<strimzi-test-container.version>0.109.1</strimzi-test-container.version>

extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
1010
import org.apache.kafka.streams.StreamsConfig;
1111
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
12+
import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
1213
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
14+
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
1315
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
1416
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
1517
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
@@ -82,10 +84,15 @@ private void registerCompulsoryClasses(BuildProducer<ReflectiveClassBuildItem> r
8284
.build());
8385
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(
8486
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor.class,
85-
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor.class,
87+
org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor.class,
8688
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor.class)
8789
.reason(getClass().getName())
8890
.methods().fields().build());
91+
// for backwards compatibility with < Kafka 3.9.0
92+
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(
93+
"org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor")
94+
.reason(getClass().getName())
95+
.methods().fields().build());
8996
// See https://github.com/quarkusio/quarkus/issues/23404
9097
reflectiveClasses.produce(ReflectiveClassBuildItem
9198
.builder("org.apache.kafka.streams.processor.internals.StateDirectory$StateDirectoryProcessFile")
@@ -98,12 +105,19 @@ private void registerCompulsoryClasses(BuildProducer<ReflectiveClassBuildItem> r
98105
org.apache.kafka.streams.state.BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class)
99106
.reason(getClass().getName())
100107
.build());
108+
reflectiveClasses.produce(ReflectiveClassBuildItem
109+
.builder(org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler.class,
110+
org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler.class)
111+
.reason(getClass().getName())
112+
.methods().fields().build());
101113
}
102114

103115
private void registerClassesThatClientMaySpecify(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses,
104116
LaunchModeBuildItem launchMode) {
105117
Properties properties = buildKafkaStreamsProperties(launchMode.getLaunchMode());
106-
registerExceptionHandler(reflectiveClasses, properties);
118+
registerDeserializationExceptionHandler(reflectiveClasses, properties);
119+
registerProcessingExceptionHandler(reflectiveClasses, properties);
120+
registerProductionExceptionHandler(reflectiveClasses, properties);
107121
registerDefaultSerdes(reflectiveClasses, properties);
108122
registerDslStoreSupplier(reflectiveClasses, properties);
109123
}
@@ -121,13 +135,15 @@ private void registerDslStoreSupplier(BuildProducer<ReflectiveClassBuildItem> re
121135
}
122136
}
123137

124-
private void registerExceptionHandler(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses,
138+
private void registerDeserializationExceptionHandler(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses,
125139
Properties kafkaStreamsProperties) {
126140
String exceptionHandlerClassName = kafkaStreamsProperties
127141
.getProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG);
128142

129143
if (exceptionHandlerClassName == null) {
130-
registerDefaultExceptionHandler(reflectiveClasses);
144+
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(LogAndFailExceptionHandler.class)
145+
.reason(getClass().getName())
146+
.build());
131147
} else {
132148
reflectiveClasses.produce(
133149
ReflectiveClassBuildItem.builder(exceptionHandlerClassName)
@@ -136,10 +152,41 @@ private void registerExceptionHandler(BuildProducer<ReflectiveClassBuildItem> re
136152
}
137153
}
138154

139-
private void registerDefaultExceptionHandler(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses) {
140-
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(LogAndFailExceptionHandler.class)
141-
.reason(getClass().getName())
142-
.build());
155+
private void registerProcessingExceptionHandler(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses,
156+
Properties kafkaStreamsProperties) {
157+
String processingExceptionHandlerClassName = kafkaStreamsProperties
158+
.getProperty(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG);
159+
160+
if (processingExceptionHandlerClassName == null) {
161+
reflectiveClasses.produce(
162+
ReflectiveClassBuildItem.builder(LogAndFailProcessingExceptionHandler.class,
163+
LogAndContinueProcessingExceptionHandler.class)
164+
.reason(getClass().getName())
165+
.build());
166+
} else {
167+
reflectiveClasses.produce(
168+
ReflectiveClassBuildItem.builder(processingExceptionHandlerClassName)
169+
.reason(getClass().getName())
170+
.build());
171+
}
172+
}
173+
174+
private void registerProductionExceptionHandler(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses,
175+
Properties kafkaStreamsProperties) {
176+
String productionExceptionHandlerClassName = kafkaStreamsProperties
177+
.getProperty(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG);
178+
179+
if (productionExceptionHandlerClassName == null) {
180+
reflectiveClasses.produce(
181+
ReflectiveClassBuildItem.builder(DefaultProductionExceptionHandler.class)
182+
.reason(getClass().getName())
183+
.build());
184+
} else {
185+
reflectiveClasses.produce(
186+
ReflectiveClassBuildItem.builder(productionExceptionHandlerClassName)
187+
.reason(getClass().getName())
188+
.build());
189+
}
143190
}
144191

145192
private void registerDefaultSerdes(BuildProducer<ReflectiveClassBuildItem> reflectiveClasses,
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.quarkus.it.kafka.streams;
2+
3+
import java.util.Map;
4+
5+
import org.apache.kafka.streams.errors.ErrorHandlerContext;
6+
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
7+
import org.apache.kafka.streams.processor.api.Record;
8+
9+
public class CustomExceptionHandler implements ProcessingExceptionHandler {
10+
11+
@Override
12+
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
13+
return ProcessingHandlerResponse.CONTINUE;
14+
}
15+
16+
@Override
17+
public void configure(Map<String, ?> map) {
18+
}
19+
}

integration-tests/kafka-streams/src/main/resources/application.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,6 @@ quarkus.kafka.devservices.enabled=false
3232

3333
# Enable health checks
3434
quarkus.kafka-streams.health.enabled=true
35+
36+
# Configure the exception handler
37+
kafka-streams.processing.exception.handler=io.quarkus.it.kafka.streams.CustomExceptionHandler

0 commit comments

Comments
 (0)