From 8f52e1efd394570401f1184930ca7cb88bf6e724 Mon Sep 17 00:00:00 2001 From: Charith Ellawala Date: Thu, 16 Nov 2023 15:23:56 +0000 Subject: [PATCH] chore(test): Fix Kafka integration tests (#1878) Kafka tests have been flaky lately and it seems to be due to two factors: - Redpanda RPK tries to create a temporary configuration file in the same location as the `redpanda.yaml` file. If we mount a volume containing `redpanda.yaml` this fails because the `redpanda` user (uid 101) doesn't have permission to chmod. - A manifestation of https://github.com/moby/moby/issues/42442 This PR includes some refactoring to: - Skip the RPK and start redpanda directly using a templated config file - Combine the Kafka launch configuration to avoid repetition - Output the Kafka container logs if `CERBOS_DEBUG_KAFKA` is set Fixes #1875 Signed-off-by: Charith Ellawala --------- Signed-off-by: Charith Ellawala --- .github/workflows/pr-test.yaml | 1 + internal/audit/kafka/kafka_test.go | 178 ++++++++++-------- .../kafka/testdata/valid/{ => certs}/ca.crt | 0 .../audit/kafka/testdata/valid/certs/rpk.yaml | 25 +++ .../kafka/testdata/valid/{ => certs}/tls.crt | 0 .../kafka/testdata/valid/{ => certs}/tls.key | 0 .../audit/kafka/testdata/valid/redpanda.yaml | 28 --- .../valid/redpanda/redpanda.yaml.gotmpl | 30 +++ .../audit/kafka/testdata/valid/rpconfig.yaml | 25 --- internal/audit/kafka/tls_test.go | 2 +- 10 files changed, 154 insertions(+), 135 deletions(-) rename internal/audit/kafka/testdata/valid/{ => certs}/ca.crt (100%) create mode 100644 internal/audit/kafka/testdata/valid/certs/rpk.yaml rename internal/audit/kafka/testdata/valid/{ => certs}/tls.crt (100%) rename internal/audit/kafka/testdata/valid/{ => certs}/tls.key (100%) delete mode 100644 internal/audit/kafka/testdata/valid/redpanda.yaml create mode 100644 internal/audit/kafka/testdata/valid/redpanda/redpanda.yaml.gotmpl delete mode 100644 internal/audit/kafka/testdata/valid/rpconfig.yaml diff --git a/.github/workflows/pr-test.yaml b/.github/workflows/pr-test.yaml index 917a4ed2e..c8c5a163b 100644 --- a/.github/workflows/pr-test.yaml +++ b/.github/workflows/pr-test.yaml @@ -110,6 +110,7 @@ jobs: CERBOS_TEST_LOG_LEVEL: "debug" CERBOS_DEBUG_DB: "true" CERBOS_DEBUG_ENGINE: "true" + CERBOS_DEBUG_KAFKA: "true" - name: Upload JUnit reports uses: actions/upload-artifact@v3 diff --git a/internal/audit/kafka/kafka_test.go b/internal/audit/kafka/kafka_test.go index 7f12c9152..4baea3559 100644 --- a/internal/audit/kafka/kafka_test.go +++ b/internal/audit/kafka/kafka_test.go @@ -8,7 +8,10 @@ package kafka_test import ( "context" + "crypto/tls" "fmt" + "net" + "os" "path/filepath" "strconv" "testing" @@ -24,23 +27,23 @@ import ( "github.com/cerbos/cerbos/internal/audit" "github.com/cerbos/cerbos/internal/audit/kafka" "github.com/cerbos/cerbos/internal/config" + "github.com/cerbos/cerbos/internal/test" "github.com/cerbos/cerbos/internal/util" ) const ( redpandaImage = "redpandadata/redpanda" - redpandaVersion = "v23.1.5" + redpandaVersion = "v23.2.15" defaultIntegrationTopic = "cerbos" + maxWait = 60 * time.Second ) func TestProduceWithTLS(t *testing.T) { - t.Parallel() - ctx := context.Background() // setup kafka - uri := newKafkaBrokerWithTLS(t, defaultIntegrationTopic, "testdata/valid/ca.crt", "testdata/valid/client/tls.crt", "testdata/valid/client/tls.key") + uri := newKafkaBrokerWithTLS(t, defaultIntegrationTopic, "testdata/valid/certs/ca.crt", "testdata/valid/client/tls.crt", "testdata/valid/client/tls.key") log, err := newLog(map[string]any{ "audit": map[string]any{ "enabled": true, @@ -48,7 +51,7 @@ func TestProduceWithTLS(t *testing.T) { "kafka": map[string]any{ "authentication": map[string]any{ "tls": map[string]any{ - "caPath": "testdata/valid/ca.crt", + "caPath": "testdata/valid/certs/ca.crt", "certPath": "testdata/valid/client/tls.crt", "keyPath": "testdata/valid/client/tls.key", "reloadInterval": "10s", @@ -84,8 +87,6 @@ func TestProduceWithTLS(t *testing.T) { } func TestSyncProduce(t *testing.T) { - t.Parallel() - ctx := context.Background() // setup kafka @@ -125,8 +126,6 @@ func TestSyncProduce(t *testing.T) { } func TestCompression(t *testing.T) { - t.Parallel() - ctx := context.Background() // setup kafka @@ -166,8 +165,6 @@ func TestCompression(t *testing.T) { } func TestAsyncProduce(t *testing.T) { - t.Parallel() - ctx := context.Background() // setup kafka @@ -211,116 +208,135 @@ func TestAsyncProduce(t *testing.T) { func newKafkaBrokerWithTLS(t *testing.T, topic, caPath, certPath, keyPath string) string { t.Helper() - testDataAbsPath, err := filepath.Abs("testdata/valid") - require.NoError(t, err) - - pool, err := dockertest.NewPool("") - require.NoError(t, err, "Failed to connect to Docker") - - hostPort := 65136 - - resource, err := pool.RunWithOptions(&dockertest.RunOptions{ - Repository: redpandaImage, - Tag: redpandaVersion, - Cmd: []string{ - "redpanda", - "start", - "--mode", "dev-container", - // kafka admin client will retrieve the advertised address from the broker - // so we need it to use the same port that is exposed on the container - "--config", "/etc/redpanda/rpconfig.yaml", - "--advertise-kafka-addr", fmt.Sprintf("localhost:%d", hostPort), - }, - ExposedPorts: []string{ - "9092/tcp", - }, - PortBindings: map[docker.Port][]docker.PortBinding{ - "9092/tcp": {{HostIP: "localhost", HostPort: strconv.Itoa(hostPort)}}, - }, - Mounts: []string{ - fmt.Sprintf("%s:/etc/redpanda", testDataAbsPath), - }, - }, func(config *docker.HostConfig) { - config.AutoRemove = true - }) - require.NoError(t, err, "Failed to start container") - - t.Cleanup(func() { - _ = pool.Purge(resource) - }) - - brokerDSN := fmt.Sprintf("localhost:%d", hostPort) - duration := 10 * time.Second - skipVerify := false - ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) + duration := 10 * time.Second + skipVerify := false tlsConfig, err := kafka.NewTLSConfig(ctx, duration, skipVerify, caPath, certPath, keyPath) require.NoError(t, err) - client, err := kgo.NewClient(kgo.SeedBrokers(brokerDSN), kgo.DialTLSConfig(tlsConfig)) - require.NoError(t, err) + return startKafkaBroker(t, topic, tlsConfig) +} - require.NoError(t, pool.Retry(func() error { - return client.Ping(context.Background()) - }), "Failed to connect to Kafka") - // create topic - _, err = kadm.NewClient(client).CreateTopic(context.Background(), 1, 1, nil, topic) - require.NoError(t, err, "Failed to create Kafka topic") +func newKafkaBroker(t *testing.T, topic string) string { + t.Helper() - return brokerDSN + return startKafkaBroker(t, topic, nil) } -func newKafkaBroker(t *testing.T, topic string) string { +func startKafkaBroker(t *testing.T, topic string, tlsConfig *tls.Config) string { t.Helper() - hostPort, err := util.GetFreePort() - require.NoError(t, err, "Unable to get free port") + port, err := util.GetFreePort() + require.NoError(t, err, "Failed to find free address") + + testDataAbsPath, err := filepath.Abs("testdata/valid") + require.NoError(t, err) + + cfg := test.RenderTemplate(t, filepath.Join(testDataAbsPath, "redpanda", "redpanda.yaml.gotmpl"), struct { + TLSEnabled bool + Port int + }{ + TLSEnabled: tlsConfig != nil, + Port: port, + }) + t.Logf("Config:\n%s\n", string(cfg)) + + tempDir := t.TempDir() + require.NoError(t, os.WriteFile(filepath.Join(tempDir, "redpanda.yaml"), cfg, 0o644)) pool, err := dockertest.NewPool("") require.NoError(t, err, "Failed to connect to Docker") + pool.MaxWait = maxWait - resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + runOpts := &dockertest.RunOptions{ Repository: redpandaImage, Tag: redpandaVersion, + Entrypoint: []string{"/opt/redpanda/bin/redpanda"}, Cmd: []string{ - "redpanda", - "start", - "--mode", "dev-container", - // kafka admin client will retrieve the advertised address from the broker - // so we need it to use the same port that is exposed on the container - "--advertise-kafka-addr", fmt.Sprintf("localhost:%d", hostPort), + "--redpanda-cfg", + "/etc/redpanda/redpanda.yaml", + "--unsafe-bypass-fsync=true", + "--reserve-memory=0M", + "--overprovisioned", + "--lock-memory=false", + "--default-log-level=error", + "--logger-log-level=kafka=info:request_auth=debug:security=debug", }, ExposedPorts: []string{ "9092/tcp", }, PortBindings: map[docker.Port][]docker.PortBinding{ - "9092/tcp": {{HostIP: "localhost", HostPort: strconv.Itoa(hostPort)}}, + "9092/tcp": { + {HostIP: "::1", HostPort: strconv.Itoa(port)}, + {HostIP: "127.0.0.1", HostPort: strconv.Itoa(port)}, + }, + }, + Mounts: []string{ + fmt.Sprintf("%s:/certs", filepath.Join(testDataAbsPath, "certs")), + fmt.Sprintf("%s:/etc/redpanda", tempDir), }, - }, func(config *docker.HostConfig) { + } + + var clientOpts []kgo.Opt + exposedPort := "9092/tcp" + if tlsConfig != nil { + clientOpts = append(clientOpts, kgo.DialTLSConfig(tlsConfig)) + } + + resource, err := pool.RunWithOptions(runOpts, func(config *docker.HostConfig) { config.AutoRemove = true }) - require.NoError(t, err, "Failed to start container") t.Cleanup(func() { _ = pool.Purge(resource) }) - brokerDSN := fmt.Sprintf("localhost:%d", hostPort) - client, err := kgo.NewClient(kgo.SeedBrokers(brokerDSN)) + brokerAddr := net.JoinHostPort("localhost", resource.GetPort(exposedPort)) + clientOpts = append(clientOpts, kgo.SeedBrokers(brokerAddr)) + + if _, ok := os.LookupEnv("CERBOS_DEBUG_KAFKA"); ok { + ctx, cancelFunc := context.WithCancel(context.Background()) + go func() { + if err := pool.Client.Logs(docker.LogsOptions{ + Context: ctx, + Container: resource.Container.ID, + OutputStream: os.Stdout, + ErrorStream: os.Stderr, + Stdout: true, + Stderr: true, + Follow: true, + }); err != nil { + cancelFunc() + } + }() + t.Cleanup(cancelFunc) + } + + client, err := kgo.NewClient(clientOpts...) require.NoError(t, err) require.NoError(t, pool.Retry(func() error { - return client.Ping(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + if err := client.Ping(ctx); err != nil { + t.Logf("Ping failed: %v", err) + return err + } + + return nil }), "Failed to connect to Kafka") // create topic - _, err = kadm.NewClient(client).CreateTopic(context.Background(), 1, 1, nil, topic) + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + t.Cleanup(cancel) + _, err = kadm.NewClient(client).CreateTopic(ctx, 1, 1, nil, topic) require.NoError(t, err, "Failed to create Kafka topic") - return brokerDSN + return brokerAddr } func fetchKafkaTopic(t *testing.T, uri string, topic string, tlsEnabled bool) ([]*kgo.Record, error) { @@ -331,7 +347,7 @@ func fetchKafkaTopic(t *testing.T, uri string, topic string, tlsEnabled bool) ([ ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - tlsConfig, err := kafka.NewTLSConfig(ctx, duration, skipVerify, "testdata/valid/ca.crt", "testdata/valid/client/tls.crt", "testdata/valid/client/tls.key") + tlsConfig, err := kafka.NewTLSConfig(ctx, duration, skipVerify, "testdata/valid/certs/ca.crt", "testdata/valid/client/tls.crt", "testdata/valid/client/tls.key") if err != nil { return nil, err } diff --git a/internal/audit/kafka/testdata/valid/ca.crt b/internal/audit/kafka/testdata/valid/certs/ca.crt similarity index 100% rename from internal/audit/kafka/testdata/valid/ca.crt rename to internal/audit/kafka/testdata/valid/certs/ca.crt diff --git a/internal/audit/kafka/testdata/valid/certs/rpk.yaml b/internal/audit/kafka/testdata/valid/certs/rpk.yaml new file mode 100644 index 000000000..73857798b --- /dev/null +++ b/internal/audit/kafka/testdata/valid/certs/rpk.yaml @@ -0,0 +1,25 @@ +redpanda: + data_directory: /var/lib/redpanda/data + seed_servers: [] + kafka_api: + - address: 0.0.0.0 + port: 9092 + kafka_api_tls: + - key_file: /var/lib/redpanda/.config/rpk/tls.key + cert_file: /var/lib/redpanda/.config/rpk/tls.crt + truststore_file: /var/lib/redpanda/.config/rpk/ca.crt + enabled: true + require_client_auth: true + advertised_kafka_api: + - address: localhost + port: 65136 + developer_mode: true + auto_create_topics_enabled: true + fetch_reads_debounce_timeout: 10 + group_initial_rebalance_delay: 0 + group_topic_partitions: 3 + log_segment_size_min: 1 + storage_min_free_bytes: 10485760 + topic_partitions_per_shard: 1000 +rpk: + overprovisioned: true diff --git a/internal/audit/kafka/testdata/valid/tls.crt b/internal/audit/kafka/testdata/valid/certs/tls.crt similarity index 100% rename from internal/audit/kafka/testdata/valid/tls.crt rename to internal/audit/kafka/testdata/valid/certs/tls.crt diff --git a/internal/audit/kafka/testdata/valid/tls.key b/internal/audit/kafka/testdata/valid/certs/tls.key similarity index 100% rename from internal/audit/kafka/testdata/valid/tls.key rename to internal/audit/kafka/testdata/valid/certs/tls.key diff --git a/internal/audit/kafka/testdata/valid/redpanda.yaml b/internal/audit/kafka/testdata/valid/redpanda.yaml deleted file mode 100644 index b2151daaa..000000000 --- a/internal/audit/kafka/testdata/valid/redpanda.yaml +++ /dev/null @@ -1,28 +0,0 @@ -redpanda: - data_directory: /var/lib/redpanda/data - seed_servers: [] - rpc_server: - address: 0.0.0.0 - port: 33145 - kafka_api: - - address: 0.0.0.0 - port: 9092 - admin: - - address: 0.0.0.0 - port: 9644 - advertised_kafka_api: - - address: localhost - port: 50504 - developer_mode: true - auto_create_topics_enabled: true - fetch_reads_debounce_timeout: 10 - group_initial_rebalance_delay: 0 - group_topic_partitions: 3 - log_segment_size_min: 1 - storage_min_free_bytes: 10485760 - topic_partitions_per_shard: 1000 -rpk: - coredump_dir: /var/lib/redpanda/coredump - overprovisioned: true -pandaproxy: {} -schema_registry: {} diff --git a/internal/audit/kafka/testdata/valid/redpanda/redpanda.yaml.gotmpl b/internal/audit/kafka/testdata/valid/redpanda/redpanda.yaml.gotmpl new file mode 100644 index 000000000..65f86ed29 --- /dev/null +++ b/internal/audit/kafka/testdata/valid/redpanda/redpanda.yaml.gotmpl @@ -0,0 +1,30 @@ +redpanda: + data_directory: /var/lib/redpanda/data + kafka_api: + - name: api_listener + address: 0.0.0.0 + port: 9092 +{{- if .TLSEnabled }} + kafka_api_tls: + - name: api_listener + key_file: /certs/tls.key + cert_file: /certs/tls.crt + truststore_file: /certs/ca.crt + enabled: true + require_client_auth: true +{{- end }} + advertised_kafka_api: + - name: api_listener + address: localhost + port: {{ .Port }} + developer_mode: true + auto_create_topics_enabled: true + fetch_reads_debounce_timeout: 10 + group_initial_rebalance_delay: 0 + group_topic_partitions: 3 + log_segment_size_min: 1 + storage_min_free_bytes: 10485760 + topic_partitions_per_shard: 1000 +rpk: + coredump_dir: /var/lib/redpanda/coredump + overprovisioned: true diff --git a/internal/audit/kafka/testdata/valid/rpconfig.yaml b/internal/audit/kafka/testdata/valid/rpconfig.yaml deleted file mode 100644 index f66bd14cf..000000000 --- a/internal/audit/kafka/testdata/valid/rpconfig.yaml +++ /dev/null @@ -1,25 +0,0 @@ -redpanda: - data_directory: /var/lib/redpanda/data - seed_servers: [] - kafka_api: - - address: 0.0.0.0 - port: 9092 - kafka_api_tls: - - key_file: /etc/redpanda/tls.key - cert_file: /etc/redpanda/tls.crt - truststore_file: /etc/redpanda/ca.crt - enabled: true - require_client_auth: true - advertised_kafka_api: - - address: localhost - port: 65136 - developer_mode: true - auto_create_topics_enabled: true - fetch_reads_debounce_timeout: 10 - group_initial_rebalance_delay: 0 - group_topic_partitions: 3 - log_segment_size_min: 1 - storage_min_free_bytes: 10485760 - topic_partitions_per_shard: 1000 -rpk: - overprovisioned: true diff --git a/internal/audit/kafka/tls_test.go b/internal/audit/kafka/tls_test.go index 8212f2d6e..8c6831f47 100644 --- a/internal/audit/kafka/tls_test.go +++ b/internal/audit/kafka/tls_test.go @@ -26,7 +26,7 @@ func TestNewTLSConfig(t *testing.T) { _, err = kafka.NewTLSConfig(ctx, 0, false, "path/to/ca", "", "path/to/key") require.EqualError(t, err, "certPath and keyPath must both be empty or both be non-empty") - caCertPath := "testdata/valid/ca.crt" + caCertPath := "testdata/valid/certs/ca.crt" _, err = kafka.NewTLSConfig(ctx, 0, false, caCertPath, "", "") require.NoError(t, err)