Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KafkaContainer#withListener #7333

Merged
merged 10 commits into from
Aug 17, 2023
23 changes: 16 additions & 7 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,27 @@ KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)"

See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details.

## Multi-container usage
## Register listeners

If your test needs to run some other Docker container which needs access to Kafka, do the following:
There are scenarios where connecting to the broker is needed. The consumer/producer can be another container
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved
in the same network or a different process where port to connect to is different from the exposed port `9093`.
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved
E.g [Toxiproxy](../../docs/modules/toxiproxy.md).

* Run your other container on the same network as Kafka container, e.g.:
<!--codeinclude-->
[Network](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKafkaNetwork
[Register listener](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:registerListener
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved
<!--/codeinclude-->
* Use `kafka.getNetworkAliases().get(0)+":9092"` as bootstrap server location.
Or just give your Kafka container a network alias of your liking.

You will need to explicitly create a network and set it on the Kafka container as well as on your other containers that need to communicate with Kafka.
Container defined in the same network:

<!--codeinclude-->
[Create kcat container](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:createKCatContainer
<!--/codeinclude-->

Client using the new listener registered:
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved

<!--codeinclude-->
[Produce/Consume via new listener](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:produceConsumeMessage
<!--/codeinclude-->

## Adding this module to your project dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;

/**
* Testcontainers implementation for Apache Kafka.
Expand Down Expand Up @@ -43,6 +48,10 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {

private String clusterId = DEFAULT_CLUSTER_ID;

private static final String PROTOCOL_PREFIX = "TC";

private final Set<Supplier<String>> listeners = new HashSet<>();

/**
* @deprecated use {@link #KafkaContainer(DockerImageName)} instead
*/
Expand All @@ -63,10 +72,6 @@ public KafkaContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);

// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
Expand Down Expand Up @@ -140,6 +145,37 @@ public String getBootstrapServers() {

@Override
protected void configure() {
// Use two listeners with different names, it will force Kafka to communicate with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
Set<String> listeners = new HashSet<>();
listeners.add("PLAINTEXT://0.0.0.0:" + KAFKA_PORT);
listeners.add("BROKER://0.0.0.0:9092");

Set<String> listenerSecurityProtocolMap = new HashSet<>();
listenerSecurityProtocolMap.add("BROKER:PLAINTEXT");
listenerSecurityProtocolMap.add("PLAINTEXT:PLAINTEXT");

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerPort = listener.split(":")[1];
String listenerProtocol = String.format("%s://0.0.0.0:%s", protocol, listenerPort);
String protocolMap = String.format("%s:PLAINTEXT", protocol);
listeners.add(listenerProtocol);
listenerSecurityProtocolMap.add(protocolMap);

String host = listener.split(":")[0];
withNetworkAliases(host);
}

String kafkaListeners = String.join(",", listeners);
String kafkaListenerSecurityProtocolMap = String.join(",", listenerSecurityProtocolMap);

withEnv("KAFKA_LISTENERS", kafkaListeners);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaListenerSecurityProtocolMap);

if (this.kraftEnabled) {
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
configureKraft();
Expand Down Expand Up @@ -187,14 +223,24 @@ protected void configureZookeeper() {
protected void containerIsStarting(InspectContainerResponse containerInfo) {
super.containerIsStarting(containerInfo);

List<String> advertisedListeners = new ArrayList<>();
advertisedListeners.add(getBootstrapServers());
advertisedListeners.add(brokerAdvertisedListener(containerInfo));

List<Supplier<String>> listenersToTransform = new ArrayList<>(this.listeners);
for (int i = 0; i < listenersToTransform.size(); i++) {
Supplier<String> listenerSupplier = listenersToTransform.get(i);
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
String listener = listenerSupplier.get();
String listenerProtocol = String.format("%s://%s", protocol, listener);
advertisedListeners.add(listenerProtocol);
}

String kafkaAdvertisedListeners = String.join(",", advertisedListeners);

String command = "#!/bin/bash\n";
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
command +=
String.format(
"export KAFKA_ADVERTISED_LISTENERS=%s,%s\n",
getBootstrapServers(),
brokerAdvertisedListener(containerInfo)
);
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);

if (this.kraftEnabled && isLessThanCP740()) {
// Optimization: skip the checks
Expand Down Expand Up @@ -230,6 +276,31 @@ protected String commandZookeeper() {
return command;
}

/**
* Add a {@link Supplier} that will provide a listener with format {@code host:port}.
* Host will be added as a network alias.
* <p>
* The listener will be added to the existing listeners.
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved
* <p>
* Existing listeners:
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* </ul>
* <p>
* Existing advertised listeners:
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved
* <ul>
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
* <li>{@code container.getConfig().getHostName():9092}</li>
* </ul>
* @param listenerSupplier a supplier that will provide a listener
* @return this {@link KafkaContainer} instance
*/
public KafkaContainer withListener(Supplier<String> listenerSupplier) {
this.listeners.add(listenerSupplier);
return this;
}

protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.Testcontainers;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
Expand Down Expand Up @@ -83,16 +84,9 @@ public void testExternalZookeeperWithExternalNetwork() throws Exception {
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
// withKafkaNetwork {
GenericContainer<?> application = new GenericContainer<>(DockerImageName.parse("alpine"))
.withNetwork(network)
// }
.withNetworkAliases("dummy")
.withCommand("sleep 10000")
) {
zookeeper.start();
kafka.start();
application.start();

testKafkaFunctionality(kafka.getBootstrapServers());
}
Expand Down Expand Up @@ -195,6 +189,37 @@ public void testKraftPrecedenceOverEmbeddedZookeeper() throws Exception {
}
}

@Test
public void testUsageWithListener() throws Exception {
try (
Network network = Network.newNetwork();
// registerListener {
KafkaContainer kafka = new KafkaContainer(KAFKA_KRAFT_TEST_IMAGE)
.withListener(() -> "kafka:19092")
.withNetwork(network);
// }
// createKCatContainer {
GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.4.1")
.withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
})
.withCopyToContainer(Transferable.of("Message produced by kcat"), "/data/msgs.txt")
.withNetwork(network)
.withCommand("-c", "tail -f /dev/null")
// }
) {
kafka.start();
kcat.start();
// produceConsumeMessage {
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
String stdout = kcat
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
.getStdout();
// }
assertThat(stdout).contains("Message produced by kcat");
}
}

protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}
Expand Down