Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Redpanda improvements #7320

Merged
merged 18 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/redpanda/build.gradle
Expand Up @@ -2,6 +2,7 @@ description = "Testcontainers :: Redpanda"

dependencies {
api project(':testcontainers')
shaded 'org.freemarker:freemarker:2.3.32'

testImplementation 'org.apache.kafka:kafka-clients:3.5.0'
testImplementation 'org.assertj:assertj-core:3.24.2'
Expand Down
@@ -1,11 +1,31 @@
package org.testcontainers.redpanda;

import com.github.dockerjava.api.command.InspectContainerResponse;
import freemarker.template.Configuration;
import freemarker.template.Template;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.SneakyThrows;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Testcontainers implementation for Redpanda.
Expand All @@ -24,9 +44,21 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {

private static final int REDPANDA_PORT = 9092;

private static final int REDPANDA_ADMIN_PORT = 9644;

private static final int SCHEMA_REGISTRY_PORT = 8081;

private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
private static final int REST_PROXY_PORT = 8082;

private boolean enableAuthorization;

private String authenticationMethod = "none";

private String schemaRegistryAuthenticationMethod = "none";

private final List<String> superusers = new ArrayList<>();

private final Set<Supplier<Listener>> listenersValueSupplier = new HashSet<>();

public RedpandaContainer(String image) {
this(DockerImageName.parse(image));
Expand All @@ -41,26 +73,38 @@ public RedpandaContainer(DockerImageName imageName) {
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
}

withExposedPorts(REDPANDA_PORT, SCHEMA_REGISTRY_PORT);
withExposedPorts(REDPANDA_PORT, REDPANDA_ADMIN_PORT, SCHEMA_REGISTRY_PORT, REST_PROXY_PORT);
withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
cmd.withEntrypoint();
cmd.withUser("root:root");
});
waitingFor(Wait.forLogMessage(".*Started Kafka API server.*", 1));
withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
waitingFor(Wait.forLogMessage(".*Successfully started Redpanda!.*", 1));
withCopyFileToContainer(
MountableFile.forClasspathResource("testcontainers/entrypoint-tc.sh", 0700),
"/entrypoint-tc.sh"
);
withCommand("/entrypoint-tc.sh", "redpanda", "start", "--mode=dev-container", "--smp=1", "--memory=1G");
}

@Override
protected void configure() {
this.listenersValueSupplier.stream()
.map(Supplier::get)
.map(Listener::getAddress)
.forEach(this::withNetworkAliases);
}

@SneakyThrows
@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
super.containerIsStarting(containerInfo);

String command = "#!/bin/bash\n";

command += "/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G ";
command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
command +=
"--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);
Configuration cfg = new Configuration(Configuration.DEFAULT_INCOMPATIBLE_IMPROVEMENTS);
cfg.setClassLoaderForTemplateLoading(getClass().getClassLoader(), "testcontainers");
cfg.setDefaultEncoding("UTF-8");

copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
copyFileToContainer(getBootstrapFile(cfg), "/etc/redpanda/.bootstrap.yaml");
copyFileToContainer(getRedpandaFile(cfg), "/etc/redpanda/redpanda.yaml");
}

public String getBootstrapServers() {
Expand All @@ -70,4 +114,104 @@ public String getBootstrapServers() {
public String getSchemaRegistryAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT));
}

public String getAdminAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(REDPANDA_ADMIN_PORT));
}

public String getRestProxyAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(REST_PROXY_PORT));
}

public RedpandaContainer enableAuthorization() {
this.enableAuthorization = true;
return this;
}

public RedpandaContainer enableSasl() {
this.authenticationMethod = "sasl";
return this;
}

public RedpandaContainer enableSchemaRegistryHttpBasicAuth() {
this.schemaRegistryAuthenticationMethod = "http_basic";
return this;
}

public RedpandaContainer withSuperuser(String username) {
this.superusers.add(username);
return this;
}

public RedpandaContainer withListener(Supplier<String> listenerSupplier) {
String[] parts = listenerSupplier.get().split(":");
this.listenersValueSupplier.add(() -> new Listener(parts[0], Integer.parseInt(parts[1])));
return this;
}

private Transferable getBootstrapFile(Configuration cfg) {
Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("superusers", this.superusers);

Map<String, Object> root = new HashMap<>();
root.put("kafkaApi", kafkaApi);

String file = resolveTemplate(cfg, "bootstrap.yaml.ftl", root);

return Transferable.of(file, 0700);
}

private Transferable getRedpandaFile(Configuration cfg) {
List<Map<String, Object>> listeners =
this.listenersValueSupplier.stream()
.map(Supplier::get)
.map(listener -> {
Map<String, Object> listenerMap = new HashMap<>();
listenerMap.put("address", listener.getAddress());
listenerMap.put("port", listener.getPort());
return listenerMap;
})
.collect(Collectors.toList());

Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("authenticationMethod", this.authenticationMethod);
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("advertisedHost", getHost());
kafkaApi.put("advertisedPort", getMappedPort(9092));
kafkaApi.put("listeners", listeners);

Map<String, Object> schemaRegistry = new HashMap<>();
schemaRegistry.put("authenticationMethod", this.schemaRegistryAuthenticationMethod);

Map<String, Object> root = new HashMap<>();
root.put("kafkaApi", kafkaApi);
root.put("schemaRegistry", schemaRegistry);

String file = resolveTemplate(cfg, "redpanda.yaml.ftl", root);

return Transferable.of(file, 0700);
}

@SneakyThrows
private String resolveTemplate(Configuration cfg, String template, Map<String, Object> data) {
Template temp = cfg.getTemplate(template);

@Cleanup
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@Cleanup
Writer out = new OutputStreamWriter(byteArrayOutputStream, StandardCharsets.UTF_8);
temp.process(data, out);

return new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8);
}

@Data
@AllArgsConstructor
private static class Listener {

private String address;

private int port;
}
}
@@ -0,0 +1,18 @@
# Injected by testcontainers
# This file contains cluster properties which will only be considered when
# starting the cluster for the first time. Afterwards, you can configure cluster
# properties via the Redpanda Admi n API.
superusers:
<#if kafkaApi.superusers?has_content >
<#list kafkaApi.superusers as superuser>
- ${superuser}
</#list>
<#else>
[]
</#if>

<#if kafkaApi.enableAuthorization >
kafka_enable_authorization: true
</#if>

auto_create_topics_enabled: true
@@ -0,0 +1,8 @@
#!/usr/bin/env bash

# Wait for testcontainer's injected redpanda config with the port only known after docker start
until grep -q "# Injected by testcontainers" "/etc/redpanda/redpanda.yaml"
do
sleep 0.1
done
exec /entrypoint.sh $@
@@ -0,0 +1,73 @@
# Injected by testcontainers
<#setting boolean_format="c">
<#setting number_format="c">
redpanda:
admin:
address: 0.0.0.0
port: 9644

kafka_api:
- address: 0.0.0.0
name: external
port: 9092
authentication_method: ${ kafkaApi.authenticationMethod }

# This listener is required for the schema registry client. The schema
# registry client connects via an advertised listener like a normal Kafka
# client would do. It can't use the other listener because the mapped
# port is not accessible from within the Redpanda container.
- address: 0.0.0.0
name: internal
port: 9093
authentication_method: <#if kafkaApi.enableAuthorization >sasl<#else>none</#if>

<#list kafkaApi.listeners as listener>
- address: 0.0.0.0
name: ${listener.address}
port: ${listener.port}
</#list>

advertised_kafka_api:
- address: ${ kafkaApi.advertisedHost }
name: external
port: ${ kafkaApi.advertisedPort }
- address: 127.0.0.1
name: internal
port: 9093
<#list kafkaApi.listeners as listener>
- address: ${listener.address}
name: ${listener.address}
port: ${listener.port}
</#list>

schema_registry:
schema_registry_api:
- address: "0.0.0.0"
name: main
port: 8081
authentication_method: ${ schemaRegistry.authenticationMethod }

schema_registry_client:
brokers:
- address: localhost
port: 9093

pandaproxy:
pandaproxy_api:
- address: 0.0.0.0
port: 8082
name: proxy-internal
advertised_pandaproxy_api:
- address: 127.0.0.1
port: 8082
name: proxy-internal

pandaproxy_client:
brokers:
- address: localhost
port: 9093

rpk:
kafka_api:
brokers:
- localhost:9093