Skip to content

Commit 45ae825

Browse files
committed
Simplify KafkaContainerCluster#start
1 parent 3ec936c commit 45ae825

File tree

2 files changed

+11
-21
lines changed

2 files changed

+11
-21
lines changed

examples/kafka-cluster/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ dependencies {
1313
compile 'org.apache.kafka:kafka-clients:2.3.1'
1414
testCompile 'org.assertj:assertj-core:3.14.0'
1515
testCompile 'com.google.guava:guava:23.0'
16+
testCompile 'org.slf4j:slf4j-simple:+'
1617
}

examples/kafka-cluster/src/main/java/com/example/kafkacluster/KafkaContainerCluster.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
package com.example.kafkacluster;
22

3-
import com.github.dockerjava.api.DockerClient;
4-
import com.github.dockerjava.core.command.ExecStartResultCallback;
53
import lombok.SneakyThrows;
64
import org.rnorth.ducttape.unreliables.Unreliables;
7-
import org.testcontainers.DockerClientFactory;
5+
import org.testcontainers.containers.Container;
86
import org.testcontainers.containers.GenericContainer;
97
import org.testcontainers.containers.Network;
108
import org.testcontainers.lifecycle.Startable;
119
import org.testcontainers.lifecycle.Startables;
1210
import org.testcontainers.containers.KafkaContainer;
1311

14-
import java.io.ByteArrayOutputStream;
1512
import java.util.Collection;
1613
import java.util.concurrent.TimeUnit;
1714
import java.util.stream.Collectors;
@@ -30,7 +27,6 @@ public class KafkaContainerCluster implements Startable {
3027
private final Network network;
3128
private final GenericContainer zookeeper;
3229
private final Collection<KafkaContainer> brokers;
33-
private final DockerClient dockerClient = DockerClientFactory.instance().client();
3430

3531
public KafkaContainerCluster(int brokersNum, int internalTopicsRf) {
3632
this(CONFLUENT_PLATFORM_VERSION, brokersNum, internalTopicsRf);
@@ -96,25 +92,18 @@ private Stream<GenericContainer> allContainers() {
9692
@Override
9793
@SneakyThrows
9894
public void start() {
99-
Stream<Startable> startables = this.brokers.stream().map(b -> (Startable) b);
95+
Stream<Startable> startables = this.brokers.stream().map(Startable.class::cast);
10096
Startables.deepStart(startables).get(60, SECONDS);
10197

102-
Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> Stream.of(this.zookeeper)
103-
.map(this::clusterBrokers)
104-
.anyMatch(brokers -> brokers.split(",").length == this.brokersNum));
105-
}
98+
Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> {
99+
Container.ExecResult result = this.zookeeper.execInContainer(
100+
"sh", "-c",
101+
"zookeeper-shell zookeeper:" + KafkaContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1"
102+
);
103+
String brokers = result.getStdout();
106104

107-
@SneakyThrows
108-
private String clusterBrokers(GenericContainer c) {
109-
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
110-
dockerClient
111-
.execStartCmd(
112-
dockerClient.execCreateCmd(c.getContainerId()).withAttachStdout(true)
113-
.withCmd("sh", "-c", "zookeeper-shell zookeeper:" + KafkaContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1").exec().getId()
114-
)
115-
.exec(new ExecStartResultCallback(outputStream, null))
116-
.awaitCompletion();
117-
return outputStream.toString();
105+
return brokers != null && brokers.split(",").length == this.brokersNum;
106+
});
118107
}
119108

120109
@Override

0 commit comments

Comments
 (0)