Skip to content

Commit 625ddc3

Browse files
bsideupseglo
andauthored
Kafka cluster example (#1984, #3758)
Co-authored-by: Sean Glover <[email protected]>
1 parent 73da361 commit 625ddc3

File tree

6 files changed

+239
-2
lines changed

6 files changed

+239
-2
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
plugins {
2+
id 'java'
3+
}
4+
5+
repositories {
6+
jcenter()
7+
}
8+
9+
dependencies {
10+
testCompileOnly "org.projectlombok:lombok:1.18.10"
11+
testAnnotationProcessor "org.projectlombok:lombok:1.18.10"
12+
testCompile 'org.testcontainers:kafka'
13+
testCompile 'org.apache.kafka:kafka-clients:2.3.1'
14+
testCompile 'org.assertj:assertj-core:3.14.0'
15+
testCompile 'com.google.guava:guava:23.0'
16+
testCompile 'org.slf4j:slf4j-simple:1.7.30'
17+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package com.example.kafkacluster;
2+
3+
import lombok.SneakyThrows;
4+
import org.rnorth.ducttape.unreliables.Unreliables;
5+
import org.testcontainers.containers.Container;
6+
import org.testcontainers.containers.GenericContainer;
7+
import org.testcontainers.containers.Network;
8+
import org.testcontainers.lifecycle.Startable;
9+
import org.testcontainers.lifecycle.Startables;
10+
import org.testcontainers.containers.KafkaContainer;
11+
import org.testcontainers.utility.DockerImageName;
12+
13+
import java.util.Collection;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.stream.Collectors;
16+
import java.util.stream.IntStream;
17+
import java.util.stream.Stream;
18+
19+
import static java.util.concurrent.TimeUnit.SECONDS;
20+
21+
/**
22+
* Provides an easy way to launch a Kafka cluster with multiple brokers.
23+
*/
24+
public class KafkaContainerCluster implements Startable {
25+
26+
private final int brokersNum;
27+
private final Network network;
28+
private final GenericContainer<?> zookeeper;
29+
private final Collection<KafkaContainer> brokers;
30+
31+
public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
32+
if (brokersNum < 0) {
33+
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
34+
}
35+
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
36+
throw new IllegalArgumentException("internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0");
37+
}
38+
39+
this.brokersNum = brokersNum;
40+
this.network = Network.newNetwork();
41+
42+
this.zookeeper = new GenericContainer<>(DockerImageName.parse("confluentinc/cp-zookeeper").withTag(confluentPlatformVersion))
43+
.withNetwork(network)
44+
.withNetworkAliases("zookeeper")
45+
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(KafkaContainer.ZOOKEEPER_PORT));
46+
47+
this.brokers = IntStream
48+
.range(0, this.brokersNum)
49+
.mapToObj(brokerNum -> {
50+
return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag(confluentPlatformVersion))
51+
.withNetwork(this.network)
52+
.withNetworkAliases("broker-" + brokerNum)
53+
.dependsOn(this.zookeeper)
54+
.withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT)
55+
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
56+
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
57+
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
58+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
59+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "");
60+
})
61+
.collect(Collectors.toList());
62+
}
63+
64+
public Collection<KafkaContainer> getBrokers() {
65+
return this.brokers;
66+
}
67+
68+
public String getBootstrapServers() {
69+
return brokers.stream()
70+
.map(KafkaContainer::getBootstrapServers)
71+
.collect(Collectors.joining(","));
72+
}
73+
74+
private Stream<GenericContainer<?>> allContainers() {
75+
return Stream.concat(
76+
this.brokers.stream(),
77+
Stream.of(this.zookeeper)
78+
);
79+
}
80+
81+
@Override
82+
@SneakyThrows
83+
public void start() {
84+
Stream<Startable> startables = this.brokers.stream().map(Startable.class::cast);
85+
Startables.deepStart(startables).get(60, SECONDS);
86+
87+
Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> {
88+
Container.ExecResult result = this.zookeeper.execInContainer(
89+
"sh", "-c",
90+
"zookeeper-shell zookeeper:" + KafkaContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1"
91+
);
92+
String brokers = result.getStdout();
93+
94+
return brokers != null && brokers.split(",").length == this.brokersNum;
95+
});
96+
}
97+
98+
@Override
99+
public void stop() {
100+
allContainers().parallel().forEach(GenericContainer::stop);
101+
}
102+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.example.kafkacluster;
2+
3+
import com.google.common.collect.ImmutableMap;
4+
import org.apache.kafka.clients.admin.AdminClient;
5+
import org.apache.kafka.clients.admin.AdminClientConfig;
6+
import org.apache.kafka.clients.admin.NewTopic;
7+
import org.apache.kafka.clients.consumer.ConsumerConfig;
8+
import org.apache.kafka.clients.consumer.ConsumerRecord;
9+
import org.apache.kafka.clients.consumer.ConsumerRecords;
10+
import org.apache.kafka.clients.consumer.KafkaConsumer;
11+
import org.apache.kafka.clients.producer.KafkaProducer;
12+
import org.apache.kafka.clients.producer.ProducerConfig;
13+
import org.apache.kafka.clients.producer.ProducerRecord;
14+
import org.apache.kafka.common.serialization.StringDeserializer;
15+
import org.apache.kafka.common.serialization.StringSerializer;
16+
import org.junit.Test;
17+
import org.rnorth.ducttape.unreliables.Unreliables;
18+
19+
import java.time.Duration;
20+
import java.util.Collection;
21+
import java.util.Collections;
22+
import java.util.UUID;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import static org.assertj.core.api.Assertions.assertThat;
26+
import static org.assertj.core.api.Assertions.tuple;
27+
28+
public class KafkaContainerClusterTest {
29+
30+
@Test
31+
public void testKafkaContainerCluster() throws Exception {
32+
try (
33+
KafkaContainerCluster cluster = new KafkaContainerCluster("5.2.1", 3, 2)
34+
) {
35+
cluster.start();
36+
String bootstrapServers = cluster.getBootstrapServers();
37+
38+
assertThat(cluster.getBrokers()).hasSize(3);
39+
40+
testKafkaFunctionality(bootstrapServers, 3, 2);
41+
}
42+
}
43+
44+
protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
45+
try (
46+
AdminClient adminClient = AdminClient.create(ImmutableMap.of(
47+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers
48+
));
49+
50+
KafkaProducer<String, String> producer = new KafkaProducer<>(
51+
ImmutableMap.of(
52+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
53+
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()
54+
),
55+
new StringSerializer(),
56+
new StringSerializer()
57+
);
58+
59+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
60+
ImmutableMap.of(
61+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
62+
ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(),
63+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
64+
),
65+
new StringDeserializer(),
66+
new StringDeserializer()
67+
);
68+
) {
69+
String topicName = "messages";
70+
71+
Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
72+
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
73+
74+
consumer.subscribe(Collections.singletonList(topicName));
75+
76+
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
77+
78+
Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
79+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
80+
81+
if (records.isEmpty()) {
82+
return false;
83+
}
84+
85+
assertThat(records)
86+
.hasSize(1)
87+
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
88+
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
89+
90+
return true;
91+
});
92+
93+
consumer.unsubscribe();
94+
}
95+
}
96+
97+
}

examples/settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ includeBuild '..'
1919

2020
// explicit include to allow Dependabot to autodiscover subprojects
2121
include 'disque-job-queue'
22+
include 'kafka-cluster'
2223
include 'linked-container'
2324
include 'mongodb-container'
2425
include 'redis-backed-cache'

modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
2424

2525
public static final int ZOOKEEPER_PORT = 2181;
2626

27+
private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
28+
2729
private static final int PORT_NOT_ASSIGNED = -1;
2830

2931
protected String externalZookeeperConnect = null;
@@ -60,8 +62,10 @@ public KafkaContainer(final DockerImageName dockerImageName) {
6062
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
6163

6264
withEnv("KAFKA_BROKER_ID", "1");
63-
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
64-
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
65+
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
66+
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
67+
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
68+
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
6569
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
6670
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
6771
}

modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66

77
import com.google.common.collect.ImmutableMap;
88
import java.time.Duration;
9+
import java.util.Collection;
910
import java.util.UUID;
1011
import java.util.concurrent.TimeUnit;
12+
import org.apache.kafka.clients.admin.AdminClient;
13+
import org.apache.kafka.clients.admin.AdminClientConfig;
14+
import org.apache.kafka.clients.admin.NewTopic;
1115
import org.apache.kafka.clients.consumer.ConsumerConfig;
1216
import org.apache.kafka.clients.consumer.ConsumerRecord;
1317
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -110,7 +114,15 @@ public void testConfluentPlatformVersion6() throws Exception {
110114
}
111115

112116
protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
117+
testKafkaFunctionality(bootstrapServers, 1, 1);
118+
}
119+
120+
protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
113121
try (
122+
AdminClient adminClient = AdminClient.create(ImmutableMap.of(
123+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers
124+
));
125+
114126
KafkaProducer<String, String> producer = new KafkaProducer<>(
115127
ImmutableMap.of(
116128
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
@@ -131,6 +143,10 @@ protected void testKafkaFunctionality(String bootstrapServers) throws Exception
131143
);
132144
) {
133145
String topicName = "messages-" + UUID.randomUUID();
146+
147+
Collection<NewTopic> topics = singletonList(new NewTopic(topicName, partitions, (short) rf));
148+
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
149+
134150
consumer.subscribe(singletonList(topicName));
135151

136152
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

0 commit comments

Comments
 (0)