Skip to content
23 changes: 16 additions & 7 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,27 @@ KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)"

See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details.

## Multi-container usage
## Register listeners

If your test needs to run some other Docker container which needs access to Kafka, do the following:
There are scenarios where additional listeners are needed because the consumer/producer can be in another
container in the same network or a different process where the port to connect differs from the default
exposed port `9093`. E.g [Toxiproxy](../../docs/modules/toxiproxy.md).

* Run your other container on the same network as Kafka container, e.g.:
<!--codeinclude-->
[Network](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKafkaNetwork
[Register additional listener](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:registerListener
<!--/codeinclude-->
* Use `kafka.getNetworkAliases().get(0)+":9092"` as bootstrap server location.
Or just give your Kafka container a network alias of your liking.

You will need to explicitly create a network and set it on the Kafka container as well as on your other containers that need to communicate with Kafka.
Container defined in the same network:

<!--codeinclude-->
[Create kcat container](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:createKCatContainer
<!--/codeinclude-->

Client using the new registered listener:

<!--codeinclude-->
[Produce/Consume via new listener](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:produceConsumeMessage
<!--/codeinclude-->

## Adding this module to your project dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;

/**
* Testcontainers implementation for Apache Kafka.
Expand Down Expand Up @@ -43,6 +48,10 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {

private String clusterId = DEFAULT_CLUSTER_ID;

private static final String PROTOCOL_PREFIX = "TC";

private final Set<Supplier<String>> listeners = new HashSet<>();

/**
* @deprecated use {@link #KafkaContainer(DockerImageName)} instead
*/
Expand All @@ -63,10 +72,6 @@ public KafkaContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);

// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
Expand Down Expand Up @@ -140,6 +145,37 @@ public String getBootstrapServers() {

@Override
protected void configure() {
// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
Set<String> listeners = new HashSet<>();
listeners.add("PLAINTEXT://0.0.0.0:" + KAFKA_PORT);
listeners.add("BROKER://0.0.0.0:9092");

Set<String> listenerSecurityProtocolMap = new HashSet<>();
listenerSecurityProtocolMap.add("BROKER:PLAINTEXT");
listenerSecurityProtocolMap.add("PLAINTEXT:PLAINTEXT");

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerPort = listener.split(":")[1];
String listenerProtocol = String.format("%s://0.0.0.0:%s", protocol, listenerPort);
String protocolMap = String.format("%s:PLAINTEXT", protocol);
listeners.add(listenerProtocol);
listenerSecurityProtocolMap.add(protocolMap);

String host = listener.split(":")[0];
withNetworkAliases(host);
}

String kafkaListeners = String.join(",", listeners);
String kafkaListenerSecurityProtocolMap = String.join(",", listenerSecurityProtocolMap);

withEnv("KAFKA_LISTENERS", kafkaListeners);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap);

if (this.kraftEnabled) {
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
configureKraft();
Expand Down Expand Up @@ -187,14 +223,24 @@ protected void configureZookeeper() {
protected void containerIsStarting(InspectContainerResponse containerInfo) {
super.containerIsStarting(containerInfo);

List<String> advertisedListeners = new ArrayList<>();
advertisedListeners.add(getBootstrapServers());
advertisedListeners.add(brokerAdvertisedListener(containerInfo));

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerProtocol = String.format("%s://%s", protocol, listener);
advertisedListeners.add(listenerProtocol);
}

String kafkaAdvertisedListeners = String.join(",", advertisedListeners);

String command = "#!/bin/bash\n";
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
command +=
String.format(
"export KAFKA_ADVERTISED_LISTENERS=%s,%s\n",
getBootstrapServers(),
brokerAdvertisedListener(containerInfo)
);
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);

if (this.kraftEnabled && isLessThanCP740()) {
// Optimization: skip the checks
Expand Down Expand Up @@ -230,6 +276,31 @@ protected String commandZookeeper() {
return command;
}

/**
* Add a {@link Supplier} that will provide a listener with format {@code host:port}.
* Host will be added as a network alias.
* <p>
* The listener will be added to the list of default listeners.
* <p>
* Default listeners:
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* </ul>
* <p>
* Default advertised listeners:
* <ul>
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
* <li>{@code container.getConfig().getHostName():9092}</li>
* </ul>
* @param listenerSupplier a supplier that will provide a listener
* @return this {@link KafkaContainer} instance
*/
public KafkaContainer withListener(Supplier<String> listenerSupplier) {
this.listeners.add(listenerSupplier);
return this;
}

protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.Testcontainers;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
Expand Down Expand Up @@ -83,16 +84,9 @@ public void testExternalZookeeperWithExternalNetwork() throws Exception {
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
// withKafkaNetwork {
GenericContainer<?> application = new GenericContainer<>(DockerImageName.parse("alpine"))
.withNetwork(network)
// }
.withNetworkAliases("dummy")
.withCommand("sleep 10000")
) {
zookeeper.start();
kafka.start();
application.start();

testKafkaFunctionality(kafka.getBootstrapServers());
}
Expand Down Expand Up @@ -195,6 +189,37 @@ public void testKraftPrecedenceOverEmbeddedZookeeper() throws Exception {
}
}

@Test
public void testUsageWithListener() throws Exception {
try (
Network network = Network.newNetwork();
// registerListener {
KafkaContainer kafka = new KafkaContainer(KAFKA_KRAFT_TEST_IMAGE)
.withListener(() -> "kafka:19092")
.withNetwork(network);
// }
// createKCatContainer {
GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.4.1")
.withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
})
.withCopyToContainer(Transferable.of("Message produced by kcat"), "/data/msgs.txt")
.withNetwork(network)
.withCommand("-c", "tail -f /dev/null")
// }
) {
kafka.start();
kcat.start();
// produceConsumeMessage {
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
String stdout = kcat
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
.getStdout();
// }
assertThat(stdout).contains("Message produced by kcat");
}
}

protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}
Expand Down