Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Redpanda improvements #7320

Merged
merged 18 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/modules/redpanda.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,43 @@ Redpanda also provides a schema registry implementation. Like the Redpanda broke
[Schema Registry](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getSchemaRegistryAddress
<!--/codeinclude-->

It is also possible to enable security capabilities of Redpanda by using:

<!--codeinclude-->
[Enable security](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:security
<!--/codeinclude-->

Superusers can be created by using:

<!--codeinclude-->
[Register Superuser](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createSuperUser
<!--/codeinclude-->

Below is an example of how to create the `AdminClient`:

<!--codeinclude-->
[Create Admin Client](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createAdminClient
<!--/codeinclude-->

There are scenarios where connecting to the broker is needed. The consumer/producer can be another container
in the same network or a different process where port to connect to is different from the exposed port `9092`. E.g [Toxiproxy](../../docs/modules/toxiproxy.md).
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved

<!--codeinclude-->
[Register listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:registerListener
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved
<!--/codeinclude-->

Container defined in the same network:

<!--codeinclude-->
[Create kcat container](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createKCatContainer
<!--/codeinclude-->

Client using the new listener registered:
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved

<!--codeinclude-->
[Produce/Consume via new listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:produceConsumeMessage
<!--/codeinclude-->

## Adding this module to your project dependencies

Add the following dependency to your `pom.xml`/`build.gradle` file:
Expand Down
1 change: 1 addition & 0 deletions modules/redpanda/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ description = "Testcontainers :: Redpanda"

dependencies {
api project(':testcontainers')
shaded 'org.freemarker:freemarker:2.3.32'

testImplementation 'org.apache.kafka:kafka-clients:3.5.1'
testImplementation 'org.assertj:assertj-core:3.24.2'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
package org.testcontainers.redpanda;

import com.github.dockerjava.api.command.InspectContainerResponse;
import freemarker.template.Configuration;
import freemarker.template.Template;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.SneakyThrows;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Testcontainers implementation for Redpanda.
Expand All @@ -14,6 +34,7 @@
* <ul>
* <li>Broker: 9092</li>
* <li>Schema Registry: 8081</li>
* <li>Proxy: 8082</li>
* </ul>
*/
public class RedpandaContainer extends GenericContainer<RedpandaContainer> {
Expand All @@ -30,9 +51,21 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {

private static final int REDPANDA_PORT = 9092;

private static final int REDPANDA_ADMIN_PORT = 9644;

private static final int SCHEMA_REGISTRY_PORT = 8081;

private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
private static final int REST_PROXY_PORT = 8082;

private boolean enableAuthorization;

private String authenticationMethod = "none";

private String schemaRegistryAuthenticationMethod = "none";

private final List<String> superusers = new ArrayList<>();

private final Set<Supplier<Listener>> listenersValueSupplier = new HashSet<>();

public RedpandaContainer(String image) {
this(DockerImageName.parse(image));
Expand All @@ -47,33 +80,198 @@ public RedpandaContainer(DockerImageName imageName) {
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
}

withExposedPorts(REDPANDA_PORT, SCHEMA_REGISTRY_PORT);
withExposedPorts(REDPANDA_PORT, REDPANDA_ADMIN_PORT, SCHEMA_REGISTRY_PORT, REST_PROXY_PORT);
withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
cmd.withEntrypoint();
cmd.withUser("root:root");
});
waitingFor(Wait.forLogMessage(".*Started Kafka API server.*", 1));
withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
waitingFor(Wait.forLogMessage(".*Successfully started Redpanda!.*", 1));
withCopyFileToContainer(
MountableFile.forClasspathResource("testcontainers/entrypoint-tc.sh", 0700),
"/entrypoint-tc.sh"
);
withCommand("/entrypoint-tc.sh", "redpanda", "start", "--mode=dev-container", "--smp=1", "--memory=1G");
}

@Override
protected void configure() {
this.listenersValueSupplier.stream()
.map(Supplier::get)
.map(Listener::getAddress)
.forEach(this::withNetworkAliases);
}

@SneakyThrows
@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
super.containerIsStarting(containerInfo);

String command = "#!/bin/bash\n";

command += "/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G ";
command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
command +=
"--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);
Configuration cfg = new Configuration(Configuration.DEFAULT_INCOMPATIBLE_IMPROVEMENTS);
cfg.setClassLoaderForTemplateLoading(getClass().getClassLoader(), "testcontainers");
cfg.setDefaultEncoding("UTF-8");

copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
copyFileToContainer(getBootstrapFile(cfg), "/etc/redpanda/.bootstrap.yaml");
copyFileToContainer(getRedpandaFile(cfg), "/etc/redpanda/redpanda.yaml");
}

/**
* Returns the bootstrap servers address.
* @return the bootstrap servers address
*/
public String getBootstrapServers() {
return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(REDPANDA_PORT));
}

/**
* Returns the schema registry address.
* @return the schema registry address
*/
public String getSchemaRegistryAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT));
}

/**
* Returns the admin address.
* @return the admin address
*/
public String getAdminAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(REDPANDA_ADMIN_PORT));
}

/**
* Returns the rest proxy address.
* @return the rest proxy address
*/
public String getRestProxyAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(REST_PROXY_PORT));
}

/**
* Enables authorization.
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer enableAuthorization() {
this.enableAuthorization = true;
return this;
}

/**
* Enables SASL.
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer enableSasl() {
this.authenticationMethod = "sasl";
return this;
}

/**
* Enables Http Basic Auth for Schema Registry.
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer enableSchemaRegistryHttpBasicAuth() {
this.schemaRegistryAuthenticationMethod = "http_basic";
return this;
}

/**
* Register username as a superuser.
* @param username username to register as a superuser
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer withSuperuser(String username) {
this.superusers.add(username);
return this;
}

/**
* Add a {@link Supplier} that will provide a listener with format {@code host:port}.
* Host will be added as a network alias.
* <p>
* The listener will be added to the existing listeners.
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved
* <p>
* Existing listeners:
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* </ul>
* <p>
* Existing advertised listeners:
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved
* <ul>
* <li>{@code container.getHost():container.getMappedPort(9092)}</li>
* <li>127.0.0.1:9093</li>
* </ul>
* @param listenerSupplier a supplier that will provide a listener
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer withListener(Supplier<String> listenerSupplier) {
String[] parts = listenerSupplier.get().split(":");
this.listenersValueSupplier.add(() -> new Listener(parts[0], Integer.parseInt(parts[1])));
return this;
}

private Transferable getBootstrapFile(Configuration cfg) {
Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("superusers", this.superusers);

Map<String, Object> root = new HashMap<>();
root.put("kafkaApi", kafkaApi);

String file = resolveTemplate(cfg, "bootstrap.yaml.ftl", root);

return Transferable.of(file, 0700);
}

private Transferable getRedpandaFile(Configuration cfg) {
List<Map<String, Object>> listeners =
this.listenersValueSupplier.stream()
.map(Supplier::get)
.map(listener -> {
Map<String, Object> listenerMap = new HashMap<>();
listenerMap.put("address", listener.getAddress());
listenerMap.put("port", listener.getPort());
return listenerMap;
})
.collect(Collectors.toList());

Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("authenticationMethod", this.authenticationMethod);
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("advertisedHost", getHost());
kafkaApi.put("advertisedPort", getMappedPort(9092));
kafkaApi.put("listeners", listeners);

Map<String, Object> schemaRegistry = new HashMap<>();
schemaRegistry.put("authenticationMethod", this.schemaRegistryAuthenticationMethod);

Map<String, Object> root = new HashMap<>();
root.put("kafkaApi", kafkaApi);
root.put("schemaRegistry", schemaRegistry);

String file = resolveTemplate(cfg, "redpanda.yaml.ftl", root);

return Transferable.of(file, 0700);
}

@SneakyThrows
private String resolveTemplate(Configuration cfg, String template, Map<String, Object> data) {
Template temp = cfg.getTemplate(template);

@Cleanup
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@Cleanup
Writer out = new OutputStreamWriter(byteArrayOutputStream, StandardCharsets.UTF_8);
temp.process(data, out);

return new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8);
}

@Data
@AllArgsConstructor
private static class Listener {

private String address;

private int port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Injected by testcontainers
# This file contains cluster properties which will only be considered when
# starting the cluster for the first time. Afterwards, you can configure cluster
# properties via the Redpanda Admi n API.
superusers:
<#if kafkaApi.superusers?has_content >
<#list kafkaApi.superusers as superuser>
- ${superuser}
</#list>
<#else>
[]
</#if>

<#if kafkaApi.enableAuthorization >
kafka_enable_authorization: true
</#if>

auto_create_topics_enabled: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash

# Wait for testcontainer's injected redpanda config with the port only known after docker start
until grep -q "# Injected by testcontainers" "/etc/redpanda/redpanda.yaml"
do
sleep 0.1
done
exec /entrypoint.sh $@