Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kafka internal docker connection #2490

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
10 changes: 5 additions & 5 deletions modules/kafka/go.mod
Expand Up @@ -3,7 +3,7 @@ module github.com/testcontainers/testcontainers-go/modules/kafka
go 1.21

require (
github.com/IBM/sarama v1.42.1
github.com/IBM/sarama v1.43.2
github.com/docker/go-connections v0.5.0
github.com/testcontainers/testcontainers-go v0.31.0
golang.org/x/mod v0.16.0
Expand All @@ -22,7 +22,7 @@ require (
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/docker v25.0.5+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand All @@ -41,7 +41,7 @@ require (
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
Expand All @@ -51,7 +51,7 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
Expand All @@ -66,7 +66,7 @@ require (
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d // indirect
Expand Down
10 changes: 10 additions & 0 deletions modules/kafka/go.sum
Expand Up @@ -6,6 +6,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ=
github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw=
github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8=
Expand Down Expand Up @@ -33,6 +35,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0=
github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30=
github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
Expand Down Expand Up @@ -88,6 +92,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
Expand All @@ -108,6 +114,8 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -183,6 +191,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
157 changes: 125 additions & 32 deletions modules/kafka/kafka.go
Expand Up @@ -20,7 +20,7 @@ const (
// starterScript {
starterScriptContent = `#!/bin/bash
source /etc/confluent/docker/bash-config
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092
export KAFKA_ADVERTISED_LISTENERS=%s
echo Starting Kafka KRaft mode
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
Expand All @@ -34,6 +34,13 @@ echo '' > /etc/confluent/docker/ensure
type KafkaContainer struct {
testcontainers.Container
ClusterID string
Listeners KafkaListener
}

type KafkaListener struct {
Name string
Ip string
Port string
}

// RunContainer creates an instance of the Kafka container type
Expand All @@ -43,10 +50,10 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
ExposedPorts: []string{string(publicPort)},
Env: map[string]string{
// envVars {
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
"KAFKA_LISTENERS": "EXTERNAL://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_REST_BOOTSTRAP_SERVERS": "EXTERNAL://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "INTERNAL",
"KAFKA_BROKER_ID": "1",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS": "1",
Expand All @@ -62,29 +69,61 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
Entrypoint: []string{"sh"},
// this CMD will wait for the starter script to be copied into the container and then execute it
Cmd: []string{"-c", "while [ ! -f " + starterScript + " ]; do sleep 0.1; done; bash " + starterScript},
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{
}

genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

settings := defaultOptions()
for _, opt := range opts {
if apply, ok := opt.(Option); ok {
apply(&settings)
}
if err := opt.Customize(&genericContainerReq); err != nil {
return nil, err
}
}

trimListeners(settings.Listeners)
if err := validateListeners(settings.Listeners); err != nil {
return nil, fmt.Errorf("listeners validation: %w", err)
}

// apply envs for listeners
envChange := editEnvsForListeners(settings.Listeners)
for key, item := range envChange {
genericContainerReq.Env[key] = item
}

genericContainerReq.ContainerRequest.LifecycleHooks =
[]testcontainers.ContainerLifecycleHooks{
{
PostStarts: []testcontainers.ContainerHook{
// 1. copy the starter script into the container
func(ctx context.Context, c testcontainers.Container) error {
host, err := c.Host(ctx)
if err != nil {
return err
if len(settings.Listeners) == 0 {
defaultInternal, err := internalListener(ctx, c)
if err != nil {
return fmt.Errorf("can't create default internal listener: %w", err)
}
settings.Listeners = append(settings.Listeners, defaultInternal)
}

inspect, err := c.Inspect(ctx)
defaultExternal, err := externalListener(ctx, c)
if err != nil {
return err
return fmt.Errorf("can't create default external listener: %w", err)
}

hostname := inspect.Config.Hostname
settings.Listeners = append(settings.Listeners, defaultExternal)

port, err := c.MappedPort(ctx, publicPort)
if err != nil {
return err
var advertised []string
for _, item := range settings.Listeners {
advertised = append(advertised, fmt.Sprintf("%s://%s:%s", item.Name, item.Ip, item.Port))
}

scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname)
scriptContent := fmt.Sprintf(starterScriptContent, strings.Join(advertised, ","))

return c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755)
},
Expand All @@ -94,19 +133,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
},
},
},
},
}

genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

for _, opt := range opts {
if err := opt.Customize(&genericContainerReq); err != nil {
return nil, err
}
}

err := validateKRaftVersion(genericContainerReq.Image)
if err != nil {
Expand All @@ -125,12 +152,78 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return &KafkaContainer{Container: container, ClusterID: clusterID}, nil
}

func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
req.Env["CLUSTER_ID"] = clusterID
func trimListeners(listeners []KafkaListener) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can embed this function into the validate listeners one? That will avoid having two steps in the call, and I also consider this code part of the validation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are right

for i := 0; i < len(listeners); i++ {
listeners[i].Name = strings.ToUpper(strings.Trim(listeners[i].Name, " "))
listeners[i].Ip = strings.Trim(listeners[i].Ip, " ")
listeners[i].Port = strings.Trim(listeners[i].Port, " ")
}
}

return nil
func validateListeners(listeners []KafkaListener) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add unit tests for this function 🙏

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

var ports map[string]bool = make(map[string]bool, len(listeners)+2)
var names map[string]bool = make(map[string]bool, len(listeners)+2)

// check for default listeners
ports["9094"] = true
ports["9093"] = true

// check for default listeners
names["CONTROLLER"] = true
names["EXTERNAL"] = true

for _, item := range listeners {
if names[item.Name] {
return fmt.Errorf("duplicate of listener name: %s", item.Name)
}
names[item.Name] = true

if ports[item.Port] {
return fmt.Errorf("duplicate of listener port: %s", item.Port)
}
ports[item.Port] = true
}

return nil
}

func editEnvsForListeners(listeners []KafkaListener) map[string]string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add unit tests for this function 🙏

If possible, I think we can combine the functions receiving the listeners into just one. Wdyt?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I made them separate by using clean code principles. I think it will be more clear to people editing code after me

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine, I'm not against it. I simply looked at the code, and saw many steps. Not a blocker, as this is the internal details of the module, not public API

if len(listeners) == 0 {
// no change
return map[string]string{}
}

envs := map[string]string{
"KAFKA_LISTENERS": "CONTROLLER://0.0.0.0:9094, EXTERNAL://0.0.0.0:9093",
"KAFKA_REST_BOOTSTRAP_SERVERS": "CONTROLLER://0.0.0.0:9094, EXTERNAL://0.0.0.0:9093",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT, EXTERNAL:PLAINTEXT",
}

// expect first listener has common network between kafka instances
envs["KAFKA_INTER_BROKER_LISTENER_NAME"] = listeners[0].Name

// expect small number of listeners, so joins is okay
for _, item := range listeners {
envs["KAFKA_LISTENERS"] = strings.Join(
[]string{
envs["KAFKA_LISTENERS"],
fmt.Sprintf("%s://0.0.0.0:%s", item.Name, item.Port),
},
",",
)

envs["KAFKA_REST_BOOTSTRAP_SERVERS"] = envs["KAFKA_LISTENERS"]

envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] = strings.Join(
[]string{
envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"],
item.Name + ":" + "PLAINTEXT",
},
",",
)
}

return envs
}

// Brokers retrieves the broker connection strings from Kafka with only one entry,
Expand Down
11 changes: 4 additions & 7 deletions modules/kafka/kafka_test.go
Expand Up @@ -98,15 +98,12 @@ func TestKafka_invalidVersion(t *testing.T) {
}

// assertAdvertisedListeners checks that the advertised listeners are set correctly:
// - The BROKER:// protocol is using the hostname of the Kafka container
// - The INTERNAL:// protocol is using the hostname of the Kafka container
func assertAdvertisedListeners(t *testing.T, container testcontainers.Container) {
inspect, err := container.Inspect(context.Background())
hostname, err := container.Host(context.Background())
if err != nil {
t.Fatal(err)
}

hostname := inspect.Config.Hostname

code, r, err := container.Exec(context.Background(), []string{"cat", "/usr/sbin/testcontainers_start.sh"})
if err != nil {
t.Fatal(err)
Expand All @@ -121,7 +118,7 @@ func assertAdvertisedListeners(t *testing.T, container testcontainers.Container)
t.Fatal(err)
}

if !strings.Contains(string(bs), "BROKER://"+hostname+":9092") {
t.Fatalf("expected advertised listeners to contain %s, got %s", "BROKER://"+hostname+":9092", string(bs))
if !strings.Contains(string(bs), "INTERNAL://"+hostname+":9092") {
t.Fatalf("expected advertised listeners to contain %s, got %s", "INTERNAL://"+hostname+":9092", string(bs))
}
}