Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Redpanda improvements #7320

Merged
merged 18 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/modules/redpanda.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,44 @@ Redpanda also provides a schema registry implementation. Like the Redpanda broke
[Schema Registry](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getSchemaRegistryAddress
<!--/codeinclude-->

It is also possible to enable security capabilities of Redpanda by using:

<!--codeinclude-->
[Enable security](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:security
<!--/codeinclude-->

Superusers can be created by using:

<!--codeinclude-->
[Register Superuser](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createSuperUser
<!--/codeinclude-->

Below is an example of how to create the `AdminClient`:

<!--codeinclude-->
[Create Admin Client](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createAdminClient
<!--/codeinclude-->

There are scenarios where additional listeners are needed because the consumer/producer can be another
container in the same network or a different process where the port to connect differs from the default
exposed port `9092`. E.g [Toxiproxy](../../docs/modules/toxiproxy.md).

<!--codeinclude-->
[Register additional listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:registerListener
<!--/codeinclude-->

Container defined in the same network:

<!--codeinclude-->
[Create kcat container](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createKCatContainer
<!--/codeinclude-->

Client using the new registered listener:

<!--codeinclude-->
[Produce/Consume via new listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:produceConsumeMessage
<!--/codeinclude-->

## Adding this module to your project dependencies

Add the following dependency to your `pom.xml`/`build.gradle` file:
Expand Down
1 change: 1 addition & 0 deletions modules/redpanda/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ description = "Testcontainers :: Redpanda"

dependencies {
api project(':testcontainers')
shaded 'org.freemarker:freemarker:2.3.32'

testImplementation 'org.apache.kafka:kafka-clients:3.5.1'
testImplementation 'org.assertj:assertj-core:3.24.2'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
package org.testcontainers.redpanda;

import com.github.dockerjava.api.command.InspectContainerResponse;
import freemarker.template.Configuration;
import freemarker.template.Template;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.SneakyThrows;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Testcontainers implementation for Redpanda.
Expand All @@ -14,6 +34,7 @@
* <ul>
* <li>Broker: 9092</li>
* <li>Schema Registry: 8081</li>
* <li>Proxy: 8082</li>
* </ul>
*/
public class RedpandaContainer extends GenericContainer<RedpandaContainer> {
Expand All @@ -30,9 +51,21 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {

private static final int REDPANDA_PORT = 9092;

private static final int REDPANDA_ADMIN_PORT = 9644;

private static final int SCHEMA_REGISTRY_PORT = 8081;

private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
private static final int REST_PROXY_PORT = 8082;

private boolean enableAuthorization;

private String authenticationMethod = "none";

private String schemaRegistryAuthenticationMethod = "none";

private final List<String> superusers = new ArrayList<>();

private final Set<Supplier<Listener>> listenersValueSupplier = new HashSet<>();

public RedpandaContainer(String image) {
this(DockerImageName.parse(image));
Expand All @@ -47,33 +80,198 @@ public RedpandaContainer(DockerImageName imageName) {
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
}

withExposedPorts(REDPANDA_PORT, SCHEMA_REGISTRY_PORT);
withExposedPorts(REDPANDA_PORT, REDPANDA_ADMIN_PORT, SCHEMA_REGISTRY_PORT, REST_PROXY_PORT);
withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
cmd.withEntrypoint();
cmd.withUser("root:root");
});
waitingFor(Wait.forLogMessage(".*Started Kafka API server.*", 1));
withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
waitingFor(Wait.forLogMessage(".*Successfully started Redpanda!.*", 1));
withCopyFileToContainer(
MountableFile.forClasspathResource("testcontainers/entrypoint-tc.sh", 0700),
"/entrypoint-tc.sh"
);
withCommand("/entrypoint-tc.sh", "redpanda", "start", "--mode=dev-container", "--smp=1", "--memory=1G");
}

@Override
protected void configure() {
this.listenersValueSupplier.stream()
.map(Supplier::get)
.map(Listener::getAddress)
.forEach(this::withNetworkAliases);
}

@SneakyThrows
@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
super.containerIsStarting(containerInfo);

String command = "#!/bin/bash\n";

command += "/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G ";
command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
command +=
"--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);
Configuration cfg = new Configuration(Configuration.DEFAULT_INCOMPATIBLE_IMPROVEMENTS);
cfg.setClassLoaderForTemplateLoading(getClass().getClassLoader(), "testcontainers");
cfg.setDefaultEncoding("UTF-8");

copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
copyFileToContainer(getBootstrapFile(cfg), "/etc/redpanda/.bootstrap.yaml");
copyFileToContainer(getRedpandaFile(cfg), "/etc/redpanda/redpanda.yaml");
}

/**
* Returns the bootstrap servers address.
* @return the bootstrap servers address
*/
public String getBootstrapServers() {
return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(REDPANDA_PORT));
}

/**
* Returns the schema registry address.
* @return the schema registry address
*/
public String getSchemaRegistryAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT));
}

/**
* Returns the admin address.
* @return the admin address
*/
public String getAdminAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(REDPANDA_ADMIN_PORT));
}

/**
* Returns the rest proxy address.
* @return the rest proxy address
*/
public String getRestProxyAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(REST_PROXY_PORT));
}

/**
* Enables authorization.
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer enableAuthorization() {
this.enableAuthorization = true;
return this;
}

/**
* Enables SASL.
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer enableSasl() {
this.authenticationMethod = "sasl";
return this;
}

/**
* Enables Http Basic Auth for Schema Registry.
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer enableSchemaRegistryHttpBasicAuth() {
this.schemaRegistryAuthenticationMethod = "http_basic";
return this;
}

/**
* Register username as a superuser.
* @param username username to register as a superuser
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer withSuperuser(String username) {
this.superusers.add(username);
return this;
}

/**
* Add a {@link Supplier} that will provide a listener with format {@code host:port}.
* Host will be added as a network alias.
* <p>
* The listener will be added to the default listeners.
* <p>
* Default listeners:
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* </ul>
* <p>
* Default advertised listeners:
* <ul>
* <li>{@code container.getHost():container.getMappedPort(9092)}</li>
* <li>127.0.0.1:9093</li>
* </ul>
* @param listenerSupplier a supplier that will provide a listener
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer withListener(Supplier<String> listenerSupplier) {
String[] parts = listenerSupplier.get().split(":");
this.listenersValueSupplier.add(() -> new Listener(parts[0], Integer.parseInt(parts[1])));
return this;
}

private Transferable getBootstrapFile(Configuration cfg) {
Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("superusers", this.superusers);

Map<String, Object> root = new HashMap<>();
root.put("kafkaApi", kafkaApi);

String file = resolveTemplate(cfg, "bootstrap.yaml.ftl", root);

return Transferable.of(file, 0700);
}

private Transferable getRedpandaFile(Configuration cfg) {
List<Map<String, Object>> listeners =
this.listenersValueSupplier.stream()
.map(Supplier::get)
.map(listener -> {
Map<String, Object> listenerMap = new HashMap<>();
listenerMap.put("address", listener.getAddress());
listenerMap.put("port", listener.getPort());
return listenerMap;
})
.collect(Collectors.toList());

Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("authenticationMethod", this.authenticationMethod);
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("advertisedHost", getHost());
kafkaApi.put("advertisedPort", getMappedPort(9092));
kafkaApi.put("listeners", listeners);

Map<String, Object> schemaRegistry = new HashMap<>();
schemaRegistry.put("authenticationMethod", this.schemaRegistryAuthenticationMethod);

Map<String, Object> root = new HashMap<>();
root.put("kafkaApi", kafkaApi);
root.put("schemaRegistry", schemaRegistry);

String file = resolveTemplate(cfg, "redpanda.yaml.ftl", root);

return Transferable.of(file, 0700);
}

@SneakyThrows
private String resolveTemplate(Configuration cfg, String template, Map<String, Object> data) {
Template temp = cfg.getTemplate(template);

@Cleanup
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@Cleanup
Writer out = new OutputStreamWriter(byteArrayOutputStream, StandardCharsets.UTF_8);
temp.process(data, out);

return new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8);
}

@Data
@AllArgsConstructor
private static class Listener {

private String address;

private int port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Injected by testcontainers
# This file contains cluster properties which will only be considered when
# starting the cluster for the first time. Afterwards, you can configure cluster
# properties via the Redpanda Admi n API.
superusers:
<#if kafkaApi.superusers?has_content >
<#list kafkaApi.superusers as superuser>
- ${superuser}
</#list>
<#else>
[]
</#if>

<#if kafkaApi.enableAuthorization >
kafka_enable_authorization: true
</#if>

auto_create_topics_enabled: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash

# Wait for testcontainer's injected redpanda config with the port only known after docker start
until grep -q "# Injected by testcontainers" "/etc/redpanda/redpanda.yaml"
do
sleep 0.1
done
exec /entrypoint.sh $@