Skip to content

Commit 03190e4

Browse files
authored
MINOR: retry upon missing source topic (#20284)
Implements a timeout mechanism (using maxPollTimeMs) that waits for missing source topics to be created before failing, instead of immediately throwing exceptions in the new Streams protocol. Additionally, throw TopologyException when partition count mismatch is detected. Reviewers: Lucas Brutschy <[email protected]>, Alieh Saeedi <[email protected]>, Matthias J. Sax <[email protected]>
1 parent 1b588af commit 03190e4

File tree

3 files changed

+278
-47
lines changed

3 files changed

+278
-47
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.common.serialization.StringDeserializer;
2929
import org.apache.kafka.common.serialization.StringSerializer;
3030
import org.apache.kafka.common.utils.Utils;
31+
import org.apache.kafka.streams.GroupProtocol;
3132
import org.apache.kafka.streams.KafkaStreams;
3233
import org.apache.kafka.streams.KeyValue;
3334
import org.apache.kafka.streams.StreamsBuilder;
@@ -50,7 +51,8 @@
5051
import org.junit.jupiter.api.TestInfo;
5152
import org.junit.jupiter.api.Timeout;
5253
import org.junit.jupiter.params.ParameterizedTest;
53-
import org.junit.jupiter.params.provider.ValueSource;
54+
import org.junit.jupiter.params.provider.Arguments;
55+
import org.junit.jupiter.params.provider.MethodSource;
5456

5557
import java.io.File;
5658
import java.io.IOException;
@@ -59,6 +61,7 @@
5961
import java.util.Arrays;
6062
import java.util.Collections;
6163
import java.util.List;
64+
import java.util.Locale;
6265
import java.util.Objects;
6366
import java.util.Optional;
6467
import java.util.Properties;
@@ -71,6 +74,7 @@
7174
import java.util.regex.Pattern;
7275
import java.util.stream.Collectors;
7376
import java.util.stream.IntStream;
77+
import java.util.stream.Stream;
7478

7579
import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
7680
import static org.apache.kafka.streams.KafkaStreams.State.REBALANCING;
@@ -121,7 +125,7 @@ public void before(final TestInfo testInfo) throws InterruptedException {
121125
CLUSTER.createTopic(outputTopic, 1, 1);
122126
}
123127

124-
private Properties createStreamsConfig(final String topologyOptimization) {
128+
private Properties createStreamsConfig(final String topologyOptimization, final boolean useNewProtocol) {
125129
final Properties streamsConfiguration = new Properties();
126130
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
127131
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -131,9 +135,23 @@ private Properties createStreamsConfig(final String topologyOptimization) {
131135
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
132136
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
133137
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization);
138+
139+
if (useNewProtocol) {
140+
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
141+
}
142+
134143
return streamsConfiguration;
135144
}
136145

146+
private static Stream<Arguments> protocolAndOptimizationParameters() {
147+
return Stream.of(
148+
Arguments.of(StreamsConfig.OPTIMIZE, false), // OPTIMIZE with CLASSIC protocol
149+
Arguments.of(StreamsConfig.OPTIMIZE, true), // OPTIMIZE with STREAMS protocol
150+
Arguments.of(StreamsConfig.NO_OPTIMIZATION, false), // NO_OPTIMIZATION with CLASSIC protocol
151+
Arguments.of(StreamsConfig.NO_OPTIMIZATION, true) // NO_OPTIMIZATION with STREAMS protocol
152+
);
153+
}
154+
137155
@AfterEach
138156
public void whenShuttingDown() throws IOException {
139157
kafkaStreamsInstances.stream()
@@ -144,8 +162,8 @@ public void whenShuttingDown() throws IOException {
144162
}
145163

146164
@ParameterizedTest
147-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
148-
public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDoNotMatchSourceTopicWhenJoining(final String topologyOptimization) throws InterruptedException {
165+
@MethodSource("protocolAndOptimizationParameters")
166+
public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDoNotMatchSourceTopicWhenJoining(final String topologyOptimization, final boolean useNewProtocol) throws InterruptedException {
149167
final int topicBNumberOfPartitions = 6;
150168
final String inputTopicRepartitionName = "join-repartition-test";
151169
final AtomicReference<Throwable> expectedThrowable = new AtomicReference<>();
@@ -167,10 +185,12 @@ public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDo
167185
.join(topicBStream, (value1, value2) -> value2, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(10)))
168186
.to(outputTopic);
169187

170-
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization);
188+
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization, useNewProtocol);
171189
try (final KafkaStreams ks = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration)) {
172190
ks.setUncaughtExceptionHandler(exception -> {
173191
expectedThrowable.set(exception);
192+
System.out.println(String.format("[%s Protocol] Exception caught: %s",
193+
useNewProtocol ? "STREAMS" : "CLASSIC", exception.getMessage()));
174194
return SHUTDOWN_CLIENT;
175195
});
176196
ks.start();
@@ -186,8 +206,8 @@ public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDo
186206
}
187207

188208
@ParameterizedTest
189-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
190-
public void shouldDeductNumberOfPartitionsFromRepartitionOperation(final String topologyOptimization) throws Exception {
209+
@MethodSource("protocolAndOptimizationParameters")
210+
public void shouldDeductNumberOfPartitionsFromRepartitionOperation(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
191211
final String topicBMapperName = "topic-b-mapper";
192212
final int topicBNumberOfPartitions = 6;
193213
final String inputTopicRepartitionName = "join-repartition-test";
@@ -220,7 +240,7 @@ public void shouldDeductNumberOfPartitionsFromRepartitionOperation(final String
220240
.join(topicBStream, (value1, value2) -> value2, JoinWindows.of(Duration.ofSeconds(10)))
221241
.to(outputTopic);
222242

223-
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization);
243+
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization, useNewProtocol);
224244
builder.build(streamsConfiguration);
225245

226246
startStreams(builder, streamsConfiguration);
@@ -239,8 +259,8 @@ public void shouldDeductNumberOfPartitionsFromRepartitionOperation(final String
239259
}
240260

241261
@ParameterizedTest
242-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
243-
public void shouldDoProperJoiningWhenNumberOfPartitionsAreValidWhenUsingRepartitionOperation(final String topologyOptimization) throws Exception {
262+
@MethodSource("protocolAndOptimizationParameters")
263+
public void shouldDoProperJoiningWhenNumberOfPartitionsAreValidWhenUsingRepartitionOperation(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
244264
final String topicBRepartitionedName = "topic-b-scale-up";
245265
final String inputTopicRepartitionedName = "input-topic-scale-up";
246266

@@ -278,7 +298,7 @@ public void shouldDoProperJoiningWhenNumberOfPartitionsAreValidWhenUsingRepartit
278298
.join(topicBStream, (value1, value2) -> value2, JoinWindows.of(Duration.ofSeconds(10)))
279299
.to(outputTopic);
280300

281-
startStreams(builder, createStreamsConfig(topologyOptimization));
301+
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
282302

283303
assertEquals(4, getNumberOfPartitionsForTopic(toRepartitionTopicName(topicBRepartitionedName)));
284304
assertEquals(4, getNumberOfPartitionsForTopic(toRepartitionTopicName(inputTopicRepartitionedName)));
@@ -291,8 +311,8 @@ public void shouldDoProperJoiningWhenNumberOfPartitionsAreValidWhenUsingRepartit
291311
}
292312

293313
@ParameterizedTest
294-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
295-
public void shouldRepartitionToMultiplePartitions(final String topologyOptimization) throws Exception {
314+
@MethodSource("protocolAndOptimizationParameters")
315+
public void shouldRepartitionToMultiplePartitions(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
296316
final String repartitionName = "broadcasting-partitioner-test";
297317
final long timestamp = System.currentTimeMillis();
298318
final AtomicInteger partitionerInvocation = new AtomicInteger(0);
@@ -334,7 +354,7 @@ public Optional<Set<Integer>> partitions(final String topic, final Integer key,
334354
.repartition(repartitioned)
335355
.to(broadcastingOutputTopic);
336356

337-
startStreams(builder, createStreamsConfig(topologyOptimization));
357+
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
338358

339359
final String topic = toRepartitionTopicName(repartitionName);
340360

@@ -360,8 +380,8 @@ public Optional<Set<Integer>> partitions(final String topic, final Integer key,
360380

361381

362382
@ParameterizedTest
363-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
364-
public void shouldUseStreamPartitionerForRepartitionOperation(final String topologyOptimization) throws Exception {
383+
@MethodSource("protocolAndOptimizationParameters")
384+
public void shouldUseStreamPartitionerForRepartitionOperation(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
365385
final int partition = 1;
366386
final String repartitionName = "partitioner-test";
367387
final long timestamp = System.currentTimeMillis();
@@ -387,7 +407,7 @@ public void shouldUseStreamPartitionerForRepartitionOperation(final String topol
387407
.repartition(repartitioned)
388408
.to(outputTopic);
389409

390-
startStreams(builder, createStreamsConfig(topologyOptimization));
410+
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
391411

392412
final String topic = toRepartitionTopicName(repartitionName);
393413

@@ -402,8 +422,8 @@ public void shouldUseStreamPartitionerForRepartitionOperation(final String topol
402422
}
403423

404424
@ParameterizedTest
405-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
406-
public void shouldPerformSelectKeyWithRepartitionOperation(final String topologyOptimization) throws Exception {
425+
@MethodSource("protocolAndOptimizationParameters")
426+
public void shouldPerformSelectKeyWithRepartitionOperation(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
407427
final long timestamp = System.currentTimeMillis();
408428

409429
sendEvents(
@@ -421,7 +441,7 @@ public void shouldPerformSelectKeyWithRepartitionOperation(final String topology
421441
.repartition()
422442
.to(outputTopic);
423443

424-
startStreams(builder, createStreamsConfig(topologyOptimization));
444+
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
425445

426446
validateReceivedMessages(
427447
new IntegerDeserializer(),
@@ -438,8 +458,8 @@ public void shouldPerformSelectKeyWithRepartitionOperation(final String topology
438458
}
439459

440460
@ParameterizedTest
441-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
442-
public void shouldCreateRepartitionTopicIfKeyChangingOperationWasNotPerformed(final String topologyOptimization) throws Exception {
461+
@MethodSource("protocolAndOptimizationParameters")
462+
public void shouldCreateRepartitionTopicIfKeyChangingOperationWasNotPerformed(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
443463
final String repartitionName = "dummy";
444464
final long timestamp = System.currentTimeMillis();
445465

@@ -457,7 +477,7 @@ public void shouldCreateRepartitionTopicIfKeyChangingOperationWasNotPerformed(fi
457477
.repartition(Repartitioned.as(repartitionName))
458478
.to(outputTopic);
459479

460-
startStreams(builder, createStreamsConfig(topologyOptimization));
480+
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
461481

462482
validateReceivedMessages(
463483
new IntegerDeserializer(),
@@ -475,8 +495,8 @@ public void shouldCreateRepartitionTopicIfKeyChangingOperationWasNotPerformed(fi
475495
}
476496

477497
@ParameterizedTest
478-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
479-
public void shouldPerformKeySelectOperationWhenRepartitionOperationIsUsedWithKeySelector(final String topologyOptimization) throws Exception {
498+
@MethodSource("protocolAndOptimizationParameters")
499+
public void shouldPerformKeySelectOperationWhenRepartitionOperationIsUsedWithKeySelector(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
480500
final String repartitionedName = "new-key";
481501
final long timestamp = System.currentTimeMillis();
482502

@@ -501,7 +521,7 @@ public void shouldPerformKeySelectOperationWhenRepartitionOperationIsUsedWithKey
501521
.toStream()
502522
.to(outputTopic);
503523

504-
startStreams(builder, createStreamsConfig(topologyOptimization));
524+
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
505525

506526
validateReceivedMessages(
507527
new StringDeserializer(),
@@ -521,8 +541,8 @@ public void shouldPerformKeySelectOperationWhenRepartitionOperationIsUsedWithKey
521541
}
522542

523543
@ParameterizedTest
524-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
525-
public void shouldCreateRepartitionTopicWithSpecifiedNumberOfPartitions(final String topologyOptimization) throws Exception {
544+
@MethodSource("protocolAndOptimizationParameters")
545+
public void shouldCreateRepartitionTopicWithSpecifiedNumberOfPartitions(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
526546
final String repartitionName = "new-partitions";
527547
final long timestamp = System.currentTimeMillis();
528548

@@ -543,7 +563,7 @@ public void shouldCreateRepartitionTopicWithSpecifiedNumberOfPartitions(final St
543563
.toStream()
544564
.to(outputTopic);
545565

546-
startStreams(builder, createStreamsConfig(topologyOptimization));
566+
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
547567

548568
validateReceivedMessages(
549569
new IntegerDeserializer(),
@@ -561,8 +581,8 @@ public void shouldCreateRepartitionTopicWithSpecifiedNumberOfPartitions(final St
561581
}
562582

563583
@ParameterizedTest
564-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
565-
public void shouldInheritRepartitionTopicPartitionNumberFromUpstreamTopicWhenNumberOfPartitionsIsNotSpecified(final String topologyOptimization) throws Exception {
584+
@MethodSource("protocolAndOptimizationParameters")
585+
public void shouldInheritRepartitionTopicPartitionNumberFromUpstreamTopicWhenNumberOfPartitionsIsNotSpecified(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
566586
final String repartitionName = "new-topic";
567587
final long timestamp = System.currentTimeMillis();
568588

@@ -583,7 +603,7 @@ public void shouldInheritRepartitionTopicPartitionNumberFromUpstreamTopicWhenNum
583603
.toStream()
584604
.to(outputTopic);
585605

586-
startStreams(builder, createStreamsConfig(topologyOptimization));
606+
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
587607

588608
validateReceivedMessages(
589609
new IntegerDeserializer(),
@@ -601,8 +621,8 @@ public void shouldInheritRepartitionTopicPartitionNumberFromUpstreamTopicWhenNum
601621
}
602622

603623
@ParameterizedTest
604-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
605-
public void shouldCreateOnlyOneRepartitionTopicWhenRepartitionIsFollowedByGroupByKey(final String topologyOptimization) throws Exception {
624+
@MethodSource("protocolAndOptimizationParameters")
625+
public void shouldCreateOnlyOneRepartitionTopicWhenRepartitionIsFollowedByGroupByKey(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
606626
final String repartitionName = "new-partitions";
607627
final long timestamp = System.currentTimeMillis();
608628

@@ -629,7 +649,7 @@ public void shouldCreateOnlyOneRepartitionTopicWhenRepartitionIsFollowedByGroupB
629649
.toStream()
630650
.to(outputTopic);
631651

632-
startStreams(builder, createStreamsConfig(topologyOptimization));
652+
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
633653

634654
final String topology = builder.build().describe().toString();
635655

@@ -647,8 +667,8 @@ public void shouldCreateOnlyOneRepartitionTopicWhenRepartitionIsFollowedByGroupB
647667
}
648668

649669
@ParameterizedTest
650-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
651-
public void shouldGenerateRepartitionTopicWhenNameIsNotSpecified(final String topologyOptimization) throws Exception {
670+
@MethodSource("protocolAndOptimizationParameters")
671+
public void shouldGenerateRepartitionTopicWhenNameIsNotSpecified(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
652672
final long timestamp = System.currentTimeMillis();
653673

654674
sendEvents(
@@ -666,7 +686,7 @@ public void shouldGenerateRepartitionTopicWhenNameIsNotSpecified(final String to
666686
.repartition(Repartitioned.with(Serdes.String(), Serdes.String()))
667687
.to(outputTopic);
668688

669-
startStreams(builder, createStreamsConfig(topologyOptimization));
689+
startStreams(builder, createStreamsConfig(topologyOptimization, useNewProtocol));
670690

671691
validateReceivedMessages(
672692
new StringDeserializer(),
@@ -683,8 +703,8 @@ public void shouldGenerateRepartitionTopicWhenNameIsNotSpecified(final String to
683703
}
684704

685705
@ParameterizedTest
686-
@ValueSource(strings = {StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION})
687-
public void shouldGoThroughRebalancingCorrectly(final String topologyOptimization) throws Exception {
706+
@MethodSource("protocolAndOptimizationParameters")
707+
public void shouldGoThroughRebalancingCorrectly(final String topologyOptimization, final boolean useNewProtocol) throws Exception {
688708
final String repartitionName = "rebalancing-test";
689709
final long timestamp = System.currentTimeMillis();
690710

@@ -711,7 +731,7 @@ public void shouldGoThroughRebalancingCorrectly(final String topologyOptimizatio
711731
.toStream()
712732
.to(outputTopic);
713733

714-
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization);
734+
final Properties streamsConfiguration = createStreamsConfig(topologyOptimization, useNewProtocol);
715735
startStreams(builder, streamsConfiguration);
716736
final Properties streamsToCloseConfigs = new Properties();
717737
streamsToCloseConfigs.putAll(streamsConfiguration);

0 commit comments

Comments
 (0)