New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix kafka internal docker connection #2490
base: main
Are you sure you want to change the base?
Changes from 3 commits
320e54d
9b6537d
15f4ca3
c7552f4
8904938
7b38c28
98793d7
62a68b5
a7c4a9f
bfa9191
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ const ( | |
// starterScript { | ||
starterScriptContent = `#!/bin/bash | ||
source /etc/confluent/docker/bash-config | ||
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092 | ||
export KAFKA_ADVERTISED_LISTENERS=%s | ||
echo Starting Kafka KRaft mode | ||
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure | ||
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure | ||
|
@@ -34,6 +34,13 @@ echo '' > /etc/confluent/docker/ensure | |
type KafkaContainer struct { | ||
testcontainers.Container | ||
ClusterID string | ||
Listeners KafkaListener | ||
} | ||
|
||
type KafkaListener struct { | ||
Name string | ||
Ip string | ||
Port string | ||
} | ||
|
||
// RunContainer creates an instance of the Kafka container type | ||
|
@@ -43,10 +50,10 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize | |
ExposedPorts: []string{string(publicPort)}, | ||
Env: map[string]string{ | ||
// envVars { | ||
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", | ||
"KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", | ||
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT", | ||
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER", | ||
"KAFKA_LISTENERS": "EXTERNAL://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", | ||
"KAFKA_REST_BOOTSTRAP_SERVERS": "EXTERNAL://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", | ||
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT", | ||
"KAFKA_INTER_BROKER_LISTENER_NAME": "INTERNAL", | ||
"KAFKA_BROKER_ID": "1", | ||
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", | ||
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS": "1", | ||
|
@@ -62,29 +69,61 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize | |
Entrypoint: []string{"sh"}, | ||
// this CMD will wait for the starter script to be copied into the container and then execute it | ||
Cmd: []string{"-c", "while [ ! -f " + starterScript + " ]; do sleep 0.1; done; bash " + starterScript}, | ||
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{ | ||
} | ||
|
||
genericContainerReq := testcontainers.GenericContainerRequest{ | ||
ContainerRequest: req, | ||
Started: true, | ||
} | ||
|
||
settings := defaultOptions() | ||
for _, opt := range opts { | ||
if apply, ok := opt.(Option); ok { | ||
apply(&settings) | ||
} | ||
if err := opt.Customize(&genericContainerReq); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
trimListeners(settings.Listeners) | ||
if err := validateListeners(settings.Listeners); err != nil { | ||
return nil, fmt.Errorf("listeners validation: %w", err) | ||
} | ||
|
||
// apply envs for listeners | ||
envChange := editEnvsForListeners(settings.Listeners) | ||
for key, item := range envChange { | ||
genericContainerReq.Env[key] = item | ||
} | ||
|
||
genericContainerReq.ContainerRequest.LifecycleHooks = | ||
[]testcontainers.ContainerLifecycleHooks{ | ||
{ | ||
PostStarts: []testcontainers.ContainerHook{ | ||
// 1. copy the starter script into the container | ||
func(ctx context.Context, c testcontainers.Container) error { | ||
host, err := c.Host(ctx) | ||
if err != nil { | ||
return err | ||
if len(settings.Listeners) == 0 { | ||
defaultInternal, err := internalListener(ctx, c) | ||
if err != nil { | ||
return fmt.Errorf("can't create default internal listener: %w", err) | ||
} | ||
settings.Listeners = append(settings.Listeners, defaultInternal) | ||
} | ||
|
||
inspect, err := c.Inspect(ctx) | ||
defaultExternal, err := externalListener(ctx, c) | ||
if err != nil { | ||
return err | ||
return fmt.Errorf("can't create default external listener: %w", err) | ||
} | ||
|
||
hostname := inspect.Config.Hostname | ||
settings.Listeners = append(settings.Listeners, defaultExternal) | ||
|
||
port, err := c.MappedPort(ctx, publicPort) | ||
if err != nil { | ||
return err | ||
var advertised []string | ||
for _, item := range settings.Listeners { | ||
advertised = append(advertised, fmt.Sprintf("%s://%s:%s", item.Name, item.Ip, item.Port)) | ||
} | ||
|
||
scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname) | ||
scriptContent := fmt.Sprintf(starterScriptContent, strings.Join(advertised, ",")) | ||
|
||
return c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755) | ||
}, | ||
|
@@ -94,19 +133,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize | |
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
genericContainerReq := testcontainers.GenericContainerRequest{ | ||
ContainerRequest: req, | ||
Started: true, | ||
} | ||
|
||
for _, opt := range opts { | ||
if err := opt.Customize(&genericContainerReq); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
err := validateKRaftVersion(genericContainerReq.Image) | ||
if err != nil { | ||
|
@@ -125,12 +152,78 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize | |
return &KafkaContainer{Container: container, ClusterID: clusterID}, nil | ||
} | ||
|
||
func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption { | ||
return func(req *testcontainers.GenericContainerRequest) error { | ||
req.Env["CLUSTER_ID"] = clusterID | ||
func trimListeners(listeners []KafkaListener) { | ||
for i := 0; i < len(listeners); i++ { | ||
listeners[i].Name = strings.ToUpper(strings.Trim(listeners[i].Name, " ")) | ||
listeners[i].Ip = strings.Trim(listeners[i].Ip, " ") | ||
listeners[i].Port = strings.Trim(listeners[i].Port, " ") | ||
} | ||
} | ||
|
||
return nil | ||
func validateListeners(listeners []KafkaListener) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add unit tests for this function 🙏 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do |
||
var ports map[string]bool = make(map[string]bool, len(listeners)+2) | ||
var names map[string]bool = make(map[string]bool, len(listeners)+2) | ||
|
||
// check for default listeners | ||
ports["9094"] = true | ||
ports["9093"] = true | ||
|
||
// check for default listeners | ||
names["CONTROLLER"] = true | ||
names["EXTERNAL"] = true | ||
|
||
for _, item := range listeners { | ||
if names[item.Name] { | ||
return fmt.Errorf("duplicate of listener name: %s", item.Name) | ||
} | ||
names[item.Name] = true | ||
|
||
if ports[item.Port] { | ||
return fmt.Errorf("duplicate of listener port: %s", item.Port) | ||
} | ||
ports[item.Port] = true | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func editEnvsForListeners(listeners []KafkaListener) map[string]string { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add unit tests for this function 🙏 If possible, I think we can combine the functions receiving the listeners into just one. Wdyt? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, I made them separate by using clean code principles. I think it will be more clear to people editing code after me There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's fine, I'm not against it. I simply looked at the code, and saw many steps. Not a blocker, as this is the internal details of the module, not public API |
||
if len(listeners) == 0 { | ||
// no change | ||
return map[string]string{} | ||
} | ||
|
||
envs := map[string]string{ | ||
"KAFKA_LISTENERS": "CONTROLLER://0.0.0.0:9094, EXTERNAL://0.0.0.0:9093", | ||
"KAFKA_REST_BOOTSTRAP_SERVERS": "CONTROLLER://0.0.0.0:9094, EXTERNAL://0.0.0.0:9093", | ||
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT, EXTERNAL:PLAINTEXT", | ||
} | ||
|
||
// expect first listener has common network between kafka instances | ||
envs["KAFKA_INTER_BROKER_LISTENER_NAME"] = listeners[0].Name | ||
|
||
// expect small number of listeners, so joins is okay | ||
for _, item := range listeners { | ||
envs["KAFKA_LISTENERS"] = strings.Join( | ||
[]string{ | ||
envs["KAFKA_LISTENERS"], | ||
fmt.Sprintf("%s://0.0.0.0:%s", item.Name, item.Port), | ||
}, | ||
",", | ||
) | ||
|
||
envs["KAFKA_REST_BOOTSTRAP_SERVERS"] = envs["KAFKA_LISTENERS"] | ||
|
||
envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] = strings.Join( | ||
[]string{ | ||
envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"], | ||
item.Name + ":" + "PLAINTEXT", | ||
}, | ||
",", | ||
) | ||
} | ||
|
||
return envs | ||
} | ||
|
||
// Brokers retrieves the broker connection strings from Kafka with only one entry, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we can embed this function into the validate listeners one? That will avoid having two steps in the call, and I also consider this code part of the validation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you are right