22
33import com .github .dockerjava .api .command .InspectContainerResponse ;
44import lombok .SneakyThrows ;
5- import org .testcontainers .images .builder .Transferable ;
65import org .testcontainers .utility .DockerImageName ;
76
8- import java .nio .charset .StandardCharsets ;
9- import java .util .Comparator ;
10-
117/**
128 * This container wraps Confluent Kafka and Zookeeper (optionally)
139 *
@@ -17,20 +13,14 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
1713 private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName .parse ("confluentinc/cp-kafka" );
1814 private static final String DEFAULT_TAG = "5.4.3" ;
1915
20- private static final String STARTER_SCRIPT = "/testcontainers_start.sh" ;
21-
2216 public static final int KAFKA_PORT = 9093 ;
2317
2418 public static final int ZOOKEEPER_PORT = 2181 ;
2519
2620 private static final String DEFAULT_INTERNAL_TOPIC_RF = "1" ;
2721
28- private static final int PORT_NOT_ASSIGNED = -1 ;
29-
3022 protected String externalZookeeperConnect = null ;
3123
32- private int port = PORT_NOT_ASSIGNED ;
33-
3424 /**
3525 * @deprecated use {@link KafkaContainer(DockerImageName)} instead
3626 */
@@ -80,83 +70,59 @@ public KafkaContainer withExternalZookeeper(String connectString) {
8070 }
8171
8272 public String getBootstrapServers () {
83- if (port == PORT_NOT_ASSIGNED ) {
84- throw new IllegalStateException ("You should start Kafka container first" );
85- }
86- return String .format ("PLAINTEXT://%s:%s" , getHost (), port );
87- }
88-
89- @ Override
90- protected void doStart () {
91- withCommand ("sh" , "-c" , "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT );
92-
93- if (externalZookeeperConnect == null ) {
94- addExposedPort (ZOOKEEPER_PORT );
95- }
96-
97- super .doStart ();
73+ return String .format ("PLAINTEXT://%s:%s" , getHost (), getMappedPort (KAFKA_PORT ));
9874 }
9975
10076 @ Override
101- @ SneakyThrows
102- protected void containerIsStarting (InspectContainerResponse containerInfo , boolean reused ) {
103- super .containerIsStarting (containerInfo , reused );
104-
105- port = getMappedPort (KAFKA_PORT );
106-
107- if (reused ) {
108- return ;
109- }
77+ protected void configure () {
78+ withEnv (
79+ "KAFKA_ADVERTISED_LISTENERS" ,
80+ String .format (
81+ "BROKER://%s:9092" ,
82+ getNetwork () != null
83+ ? getNetworkAliases ().get (0 )
84+ : "localhost"
85+ )
86+ );
11087
11188 String command = "#!/bin/bash\n " ;
112- final String zookeeperConnect ;
11389 if (externalZookeeperConnect != null ) {
114- zookeeperConnect = externalZookeeperConnect ;
90+ withEnv ( "KAFKA_ZOOKEEPER_CONNECT" , externalZookeeperConnect ) ;
11591 } else {
116- zookeeperConnect = "localhost:" + ZOOKEEPER_PORT ;
92+ addExposedPort (ZOOKEEPER_PORT );
93+ withEnv ("KAFKA_ZOOKEEPER_CONNECT" , "localhost:" + ZOOKEEPER_PORT );
11794 command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n " ;
11895 command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n " ;
11996 command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n " ;
12097 command += "zookeeper-server-start zookeeper.properties &\n " ;
12198 }
12299
123- command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n " ;
124-
125- command += "export KAFKA_ADVERTISED_LISTENERS='" + String .join ("," , getBootstrapServers (), brokerAdvertisedListener (containerInfo )) + "'\n " ;
126-
127- command += ". /etc/confluent/docker/bash-config \n " ;
128- command += "/etc/confluent/docker/configure \n " ;
129- command += "/etc/confluent/docker/launch \n " ;
100+ // Optimization: skip the checks
101+ command += "echo '' > /etc/confluent/docker/ensure \n " ;
102+ // Run the original command
103+ command += "/etc/confluent/docker/run \n " ;
104+ withCommand ("sh" , "-c" , command );
105+ }
130106
131- copyFileToContainer (
132- Transferable .of (command .getBytes (StandardCharsets .UTF_8 ), 0777 ),
133- STARTER_SCRIPT
107+ @ Override
108+ @ SneakyThrows
109+ protected void containerIsStarted (InspectContainerResponse containerInfo ) {
110+ String brokerAdvertisedListener = brokerAdvertisedListener (containerInfo );
111+ ExecResult result = execInContainer (
112+ "kafka-configs" ,
113+ "--alter" ,
114+ "--bootstrap-server" , brokerAdvertisedListener ,
115+ "--entity-type" , "brokers" ,
116+ "--entity-name" , getEnvMap ().get ("KAFKA_BROKER_ID" ),
117+ "--add-config" ,
118+ "advertised.listeners=[" + String .join ("," , getBootstrapServers (), brokerAdvertisedListener ) + "]"
134119 );
120+ if (result .getExitCode () != 0 ) {
121+ throw new IllegalStateException (result .getStderr ());
122+ }
135123 }
136124
137125 protected String brokerAdvertisedListener (InspectContainerResponse containerInfo ) {
138- // Kafka supports only one INTER_BROKER listener, so we have to pick one.
139- // The current algorithm uses the following order of resolving the IP:
140- // 1. Custom network's IP set via `withNetwork`
141- // 2. Bridge network's IP
142- // 3. Best effort fallback to getNetworkSettings#ipAddress
143- String ipAddress = containerInfo .getNetworkSettings ().getNetworks ().entrySet ()
144- .stream ()
145- .filter (it -> it .getValue ().getIpAddress () != null )
146- .max (Comparator .comparingInt (entry -> {
147- if (getNetwork () != null && getNetwork ().getId ().equals (entry .getValue ().getNetworkID ())) {
148- return 2 ;
149- }
150-
151- if ("bridge" .equals (entry .getKey ())) {
152- return 1 ;
153- }
154-
155- return 0 ;
156- }))
157- .map (it -> it .getValue ().getIpAddress ())
158- .orElseGet (() -> containerInfo .getNetworkSettings ().getIpAddress ());
159-
160- return String .format ("BROKER://%s:%s" , ipAddress , "9092" );
126+ return String .format ("BROKER://%s:%s" , containerInfo .getConfig ().getHostName (), "9092" );
161127 }
162128}
0 commit comments