Skip to content

Commit d48bab7

Browse files
eddumelendezkiview
andauthored
Allow KafkaContainer to register additional listeners (#7333)
Allow to register additional listeners. E.g. when using along with Toxiproxy, Schema Registry, Kafka Connect, KCat. Listeners's host will register as a network aliases. --------- Co-authored-by: Kevin Wittek <[email protected]>
1 parent ea0b163 commit d48bab7

File tree

3 files changed

+129
-24
lines changed

3 files changed

+129
-24
lines changed

docs/modules/kafka.md

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,27 @@ KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)"
4242

4343
See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details.
4444

45-
## Multi-container usage
45+
## Register listeners
4646

47-
If your test needs to run some other Docker container which needs access to Kafka, do the following:
47+
There are scenarios where additional listeners are needed because the consumer/producer can be in another
48+
container in the same network or a different process where the port to connect differs from the default
49+
exposed port `9093`. E.g [Toxiproxy](../../docs/modules/toxiproxy.md).
4850

49-
* Run your other container on the same network as Kafka container, e.g.:
5051
<!--codeinclude-->
51-
[Network](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKafkaNetwork
52+
[Register additional listener](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:registerListener
5253
<!--/codeinclude-->
53-
* Use `kafka.getNetworkAliases().get(0)+":9092"` as bootstrap server location.
54-
Or just give your Kafka container a network alias of your liking.
5554

56-
You will need to explicitly create a network and set it on the Kafka container as well as on your other containers that need to communicate with Kafka.
55+
Container defined in the same network:
56+
57+
<!--codeinclude-->
58+
[Create kcat container](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:createKCatContainer
59+
<!--/codeinclude-->
60+
61+
Client using the new registered listener:
62+
63+
<!--codeinclude-->
64+
[Produce/Consume via new listener](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:produceConsumeMessage
65+
<!--/codeinclude-->
5766

5867
## Adding this module to your project dependencies
5968

modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66
import org.testcontainers.utility.ComparableVersion;
77
import org.testcontainers.utility.DockerImageName;
88

9+
import java.util.ArrayList;
10+
import java.util.HashSet;
11+
import java.util.List;
912
import java.util.Objects;
13+
import java.util.Set;
14+
import java.util.function.Supplier;
1015

1116
/**
1217
* Testcontainers implementation for Apache Kafka.
@@ -43,6 +48,10 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
4348

4449
private String clusterId = DEFAULT_CLUSTER_ID;
4550

51+
private static final String PROTOCOL_PREFIX = "TC";
52+
53+
private final Set<Supplier<String>> listeners = new HashSet<>();
54+
4655
/**
4756
* @deprecated use {@link #KafkaContainer(DockerImageName)} instead
4857
*/
@@ -63,10 +72,6 @@ public KafkaContainer(final DockerImageName dockerImageName) {
6372
super(dockerImageName);
6473
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
6574

66-
// Use two listeners with different names, it will force Kafka to communicate with itself via internal
67-
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
68-
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
69-
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
7075
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
7176

7277
withEnv("KAFKA_BROKER_ID", "1");
@@ -140,6 +145,37 @@ public String getBootstrapServers() {
140145

141146
@Override
142147
protected void configure() {
148+
// Use two listeners with different names, it will force Kafka to communicate with itself via internal
149+
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
150+
Set<String> listeners = new HashSet<>();
151+
listeners.add("PLAINTEXT://0.0.0.0:" + KAFKA_PORT);
152+
listeners.add("BROKER://0.0.0.0:9092");
153+
154+
Set<String> listenerSecurityProtocolMap = new HashSet<>();
155+
listenerSecurityProtocolMap.add("BROKER:PLAINTEXT");
156+
listenerSecurityProtocolMap.add("PLAINTEXT:PLAINTEXT");
157+
158+
List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
159+
for (int i = 0; i < listenersToTransform.size(); i++) {
160+
Supplier<String> listenerSupplier = listenersToTransform.get(i);
161+
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
162+
String listener = listenerSupplier.get();
163+
String listenerPort = listener.split(":")[1];
164+
String listenerProtocol = String.format("%s://0.0.0.0:%s", protocol, listenerPort);
165+
String protocolMap = String.format("%s:PLAINTEXT", protocol);
166+
listeners.add(listenerProtocol);
167+
listenerSecurityProtocolMap.add(protocolMap);
168+
169+
String host = listener.split(":")[0];
170+
withNetworkAliases(host);
171+
}
172+
173+
String kafkaListeners = String.join(",", listeners);
174+
String kafkaListenerSecurityProtocolMap = String.join(",", listenerSecurityProtocolMap);
175+
176+
withEnv("KAFKA_LISTENERS", kafkaListeners);
177+
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap);
178+
143179
if (this.kraftEnabled) {
144180
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
145181
configureKraft();
@@ -187,14 +223,24 @@ protected void configureZookeeper() {
187223
protected void containerIsStarting(InspectContainerResponse containerInfo) {
188224
super.containerIsStarting(containerInfo);
189225

226+
List<String> advertisedListeners = new ArrayList<>();
227+
advertisedListeners.add(getBootstrapServers());
228+
advertisedListeners.add(brokerAdvertisedListener(containerInfo));
229+
230+
List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
231+
for (int i = 0; i < listenersToTransform.size(); i++) {
232+
Supplier<String> listenerSupplier = listenersToTransform.get(i);
233+
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
234+
String listener = listenerSupplier.get();
235+
String listenerProtocol = String.format("%s://%s", protocol, listener);
236+
advertisedListeners.add(listenerProtocol);
237+
}
238+
239+
String kafkaAdvertisedListeners = String.join(",", advertisedListeners);
240+
190241
String command = "#!/bin/bash\n";
191242
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
192-
command +=
193-
String.format(
194-
"export KAFKA_ADVERTISED_LISTENERS=%s,%s\n",
195-
getBootstrapServers(),
196-
brokerAdvertisedListener(containerInfo)
197-
);
243+
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);
198244

199245
if (this.kraftEnabled && isLessThanCP740()) {
200246
// Optimization: skip the checks
@@ -230,6 +276,31 @@ protected String commandZookeeper() {
230276
return command;
231277
}
232278

279+
/**
280+
* Add a {@link Supplier} that will provide a listener with format {@code host:port}.
281+
* Host will be added as a network alias.
282+
* <p>
283+
* The listener will be added to the list of default listeners.
284+
* <p>
285+
* Default listeners:
286+
* <ul>
287+
* <li>0.0.0.0:9092</li>
288+
* <li>0.0.0.0:9093</li>
289+
* </ul>
290+
* <p>
291+
* Default advertised listeners:
292+
* <ul>
293+
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
294+
* <li>{@code container.getConfig().getHostName():9092}</li>
295+
* </ul>
296+
* @param listenerSupplier a supplier that will provide a listener
297+
* @return this {@link KafkaContainer} instance
298+
*/
299+
public KafkaContainer withListener(Supplier<String> listenerSupplier) {
300+
this.listeners.add(listenerSupplier);
301+
return this;
302+
}
303+
233304
protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
234305
return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
235306
}

modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.junit.Test;
1717
import org.rnorth.ducttape.unreliables.Unreliables;
1818
import org.testcontainers.Testcontainers;
19+
import org.testcontainers.images.builder.Transferable;
1920
import org.testcontainers.utility.DockerImageName;
2021

2122
import java.time.Duration;
@@ -83,16 +84,9 @@ public void testExternalZookeeperWithExternalNetwork() throws Exception {
8384
.withNetwork(network)
8485
.withNetworkAliases("zookeeper")
8586
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
86-
// withKafkaNetwork {
87-
GenericContainer<?> application = new GenericContainer<>(DockerImageName.parse("alpine"))
88-
.withNetwork(network)
89-
// }
90-
.withNetworkAliases("dummy")
91-
.withCommand("sleep 10000")
9287
) {
9388
zookeeper.start();
9489
kafka.start();
95-
application.start();
9690

9791
testKafkaFunctionality(kafka.getBootstrapServers());
9892
}
@@ -195,6 +189,37 @@ public void testKraftPrecedenceOverEmbeddedZookeeper() throws Exception {
195189
}
196190
}
197191

192+
@Test
193+
public void testUsageWithListener() throws Exception {
194+
try (
195+
Network network = Network.newNetwork();
196+
// registerListener {
197+
KafkaContainer kafka = new KafkaContainer(KAFKA_KRAFT_TEST_IMAGE)
198+
.withListener(() -> "kafka:19092")
199+
.withNetwork(network);
200+
// }
201+
// createKCatContainer {
202+
GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.4.1")
203+
.withCreateContainerCmdModifier(cmd -> {
204+
cmd.withEntrypoint("sh");
205+
})
206+
.withCopyToContainer(Transferable.of("Message produced by kcat"), "/data/msgs.txt")
207+
.withNetwork(network)
208+
.withCommand("-c", "tail -f /dev/null")
209+
// }
210+
) {
211+
kafka.start();
212+
kcat.start();
213+
// produceConsumeMessage {
214+
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
215+
String stdout = kcat
216+
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
217+
.getStdout();
218+
// }
219+
assertThat(stdout).contains("Message produced by kcat");
220+
}
221+
}
222+
198223
protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
199224
testKafkaFunctionality(bootstrapServers, 1, 1);
200225
}

0 commit comments

Comments
 (0)