Skip to content

Commit

Permalink
Allow to register additional listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
eddumelendez committed Jul 21, 2023
1 parent a7434fb commit 36f2cd2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.github.dockerjava.api.command.InspectContainerResponse;
import freemarker.template.Configuration;
import freemarker.template.Template;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.SneakyThrows;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
Expand All @@ -18,8 +20,12 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Testcontainers implementation for Redpanda.
Expand Down Expand Up @@ -50,6 +56,8 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {

private final List<String> superusers = new ArrayList<>();

private final Set<Supplier<Listener>> listenersValueSupplier = new HashSet<>();

public RedpandaContainer(String image) {
this(DockerImageName.parse(image));
}
Expand All @@ -76,6 +84,14 @@ public RedpandaContainer(DockerImageName imageName) {
withCommand("/entrypoint-tc.sh", "redpanda", "start", "--mode=dev-container", "--smp=1", "--memory=1G");
}

@Override
protected void configure() {
this.listenersValueSupplier.stream()
.map(Supplier::get)
.map(Listener::getAddress)
.forEach(this::withNetworkAliases);
}

@SneakyThrows
@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
Expand Down Expand Up @@ -121,6 +137,12 @@ public RedpandaContainer withSuperuser(String username) {
return this;
}

public RedpandaContainer withListener(Supplier<String> listenerSupplier) {
String[] parts = listenerSupplier.get().split(":");
this.listenersValueSupplier.add(() -> new Listener(parts[0], Integer.parseInt(parts[1])));
return this;
}

private Transferable getBootstrapFile(Configuration cfg) {
Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("enableAuthorization", this.enableAuthorization);
Expand All @@ -135,11 +157,23 @@ private Transferable getBootstrapFile(Configuration cfg) {
}

private Transferable getRedpandaFile(Configuration cfg) {
List<Map<String, Object>> listeners =
this.listenersValueSupplier.stream()
.map(Supplier::get)
.map(listener -> {
Map<String, Object> listenerMap = new HashMap<>();
listenerMap.put("address", listener.getAddress());
listenerMap.put("port", listener.getPort());
return listenerMap;
})
.collect(Collectors.toList());

Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("authenticationMethod", this.authenticationMethod);
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("advertisedHost", getHost());
kafkaApi.put("advertisedPort", getMappedPort(9092));
kafkaApi.put("listeners", listeners);

Map<String, Object> schemaRegistry = new HashMap<>();
schemaRegistry.put("authenticationMethod", this.schemaRegistryAuthenticationMethod);
Expand All @@ -165,4 +199,13 @@ private String resolveTemplate(Configuration cfg, String template, Map<String, O

return new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8);
}

@Data
@AllArgsConstructor
private static class Listener {

private String address;

private int port;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,24 @@ redpanda:
port: 9093
authentication_method: <#if kafkaApi.enableAuthorization >sasl<#else>none</#if>

<#list kafkaApi.listeners as listener>
- address: 0.0.0.0
name: ${listener.address}
port: ${listener.port}
</#list>

advertised_kafka_api:
- address: ${ kafkaApi.advertisedHost }
name: external
port: ${ kafkaApi.advertisedPort }
- address: 127.0.0.1
name: internal
port: 9093
<#list kafkaApi.listeners as listener>
- address: ${listener.address}
name: ${listener.address}
port: ${listener.port}
</#list>

schema_registry:
schema_registry_api:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ public void testSchemaRegistry() {
}
}

@Test
public void testUsageWithListener() throws Exception {
try (
RedpandaContainer container = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.7")
.withListener(() -> "redpanda:19092")
) {
container.start();
testKafkaFunctionality(container.getBootstrapServers());
}
}

@SneakyThrows
@Test
public void enableSaslWithSuccessfulTopicCreation() {
Expand Down

0 comments on commit 36f2cd2

Please sign in to comment.