Skip to content

Commit

Permalink
chore(test): Fix Kafka integration tests (#1878)
Browse files Browse the repository at this point in the history
Kafka tests have been flaky lately and it seems to be due to two
factors:
- Redpanda RPK tries to create a temporary configuration file in the
same location as the `redpanda.yaml` file. If we mount a volume
containing `redpanda.yaml` this fails because the `redpanda` user (uid
101) doesn't have permission to chmod.
- A manifestation of moby/moby#42442

This PR includes some refactoring to:
- Skip the RPK and start redpanda directly using a templated config file
- Combine the Kafka launch configuration to avoid repetition
- Output the Kafka container logs if `CERBOS_DEBUG_KAFKA` is set

Fixes #1875

Signed-off-by: Charith Ellawala <charith@cerbos.dev>

---------

Signed-off-by: Charith Ellawala <charith@cerbos.dev>
  • Loading branch information
charithe committed Nov 16, 2023
1 parent cf21eb0 commit 8f52e1e
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 135 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pr-test.yaml
Expand Up @@ -110,6 +110,7 @@ jobs:
CERBOS_TEST_LOG_LEVEL: "debug"
CERBOS_DEBUG_DB: "true"
CERBOS_DEBUG_ENGINE: "true"
CERBOS_DEBUG_KAFKA: "true"

- name: Upload JUnit reports
uses: actions/upload-artifact@v3
Expand Down
178 changes: 97 additions & 81 deletions internal/audit/kafka/kafka_test.go
Expand Up @@ -8,7 +8,10 @@ package kafka_test

import (
"context"
"crypto/tls"
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"testing"
Expand All @@ -24,31 +27,31 @@ import (
"github.com/cerbos/cerbos/internal/audit"
"github.com/cerbos/cerbos/internal/audit/kafka"
"github.com/cerbos/cerbos/internal/config"
"github.com/cerbos/cerbos/internal/test"
"github.com/cerbos/cerbos/internal/util"
)

const (
redpandaImage = "redpandadata/redpanda"
redpandaVersion = "v23.1.5"
redpandaVersion = "v23.2.15"

defaultIntegrationTopic = "cerbos"
maxWait = 60 * time.Second
)

func TestProduceWithTLS(t *testing.T) {
t.Parallel()

ctx := context.Background()

// setup kafka
uri := newKafkaBrokerWithTLS(t, defaultIntegrationTopic, "testdata/valid/ca.crt", "testdata/valid/client/tls.crt", "testdata/valid/client/tls.key")
uri := newKafkaBrokerWithTLS(t, defaultIntegrationTopic, "testdata/valid/certs/ca.crt", "testdata/valid/client/tls.crt", "testdata/valid/client/tls.key")
log, err := newLog(map[string]any{
"audit": map[string]any{
"enabled": true,
"backend": "kafka",
"kafka": map[string]any{
"authentication": map[string]any{
"tls": map[string]any{
"caPath": "testdata/valid/ca.crt",
"caPath": "testdata/valid/certs/ca.crt",
"certPath": "testdata/valid/client/tls.crt",
"keyPath": "testdata/valid/client/tls.key",
"reloadInterval": "10s",
Expand Down Expand Up @@ -84,8 +87,6 @@ func TestProduceWithTLS(t *testing.T) {
}

func TestSyncProduce(t *testing.T) {
t.Parallel()

ctx := context.Background()

// setup kafka
Expand Down Expand Up @@ -125,8 +126,6 @@ func TestSyncProduce(t *testing.T) {
}

func TestCompression(t *testing.T) {
t.Parallel()

ctx := context.Background()

// setup kafka
Expand Down Expand Up @@ -166,8 +165,6 @@ func TestCompression(t *testing.T) {
}

func TestAsyncProduce(t *testing.T) {
t.Parallel()

ctx := context.Background()

// setup kafka
Expand Down Expand Up @@ -211,116 +208,135 @@ func TestAsyncProduce(t *testing.T) {
func newKafkaBrokerWithTLS(t *testing.T, topic, caPath, certPath, keyPath string) string {
t.Helper()

testDataAbsPath, err := filepath.Abs("testdata/valid")
require.NoError(t, err)

pool, err := dockertest.NewPool("")
require.NoError(t, err, "Failed to connect to Docker")

hostPort := 65136

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: redpandaImage,
Tag: redpandaVersion,
Cmd: []string{
"redpanda",
"start",
"--mode", "dev-container",
// kafka admin client will retrieve the advertised address from the broker
// so we need it to use the same port that is exposed on the container
"--config", "/etc/redpanda/rpconfig.yaml",
"--advertise-kafka-addr", fmt.Sprintf("localhost:%d", hostPort),
},
ExposedPorts: []string{
"9092/tcp",
},
PortBindings: map[docker.Port][]docker.PortBinding{
"9092/tcp": {{HostIP: "localhost", HostPort: strconv.Itoa(hostPort)}},
},
Mounts: []string{
fmt.Sprintf("%s:/etc/redpanda", testDataAbsPath),
},
}, func(config *docker.HostConfig) {
config.AutoRemove = true
})
require.NoError(t, err, "Failed to start container")

t.Cleanup(func() {
_ = pool.Purge(resource)
})

brokerDSN := fmt.Sprintf("localhost:%d", hostPort)
duration := 10 * time.Second
skipVerify := false

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

duration := 10 * time.Second
skipVerify := false
tlsConfig, err := kafka.NewTLSConfig(ctx, duration, skipVerify, caPath, certPath, keyPath)
require.NoError(t, err)

client, err := kgo.NewClient(kgo.SeedBrokers(brokerDSN), kgo.DialTLSConfig(tlsConfig))
require.NoError(t, err)
return startKafkaBroker(t, topic, tlsConfig)
}

require.NoError(t, pool.Retry(func() error {
return client.Ping(context.Background())
}), "Failed to connect to Kafka")
// create topic
_, err = kadm.NewClient(client).CreateTopic(context.Background(), 1, 1, nil, topic)
require.NoError(t, err, "Failed to create Kafka topic")
func newKafkaBroker(t *testing.T, topic string) string {
t.Helper()

return brokerDSN
return startKafkaBroker(t, topic, nil)
}

func newKafkaBroker(t *testing.T, topic string) string {
func startKafkaBroker(t *testing.T, topic string, tlsConfig *tls.Config) string {
t.Helper()

hostPort, err := util.GetFreePort()
require.NoError(t, err, "Unable to get free port")
port, err := util.GetFreePort()
require.NoError(t, err, "Failed to find free address")

testDataAbsPath, err := filepath.Abs("testdata/valid")
require.NoError(t, err)

cfg := test.RenderTemplate(t, filepath.Join(testDataAbsPath, "redpanda", "redpanda.yaml.gotmpl"), struct {
TLSEnabled bool
Port int
}{
TLSEnabled: tlsConfig != nil,
Port: port,
})
t.Logf("Config:\n%s\n", string(cfg))

tempDir := t.TempDir()
require.NoError(t, os.WriteFile(filepath.Join(tempDir, "redpanda.yaml"), cfg, 0o644))

pool, err := dockertest.NewPool("")
require.NoError(t, err, "Failed to connect to Docker")
pool.MaxWait = maxWait

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
runOpts := &dockertest.RunOptions{
Repository: redpandaImage,
Tag: redpandaVersion,
Entrypoint: []string{"/opt/redpanda/bin/redpanda"},
Cmd: []string{
"redpanda",
"start",
"--mode", "dev-container",
// kafka admin client will retrieve the advertised address from the broker
// so we need it to use the same port that is exposed on the container
"--advertise-kafka-addr", fmt.Sprintf("localhost:%d", hostPort),
"--redpanda-cfg",
"/etc/redpanda/redpanda.yaml",
"--unsafe-bypass-fsync=true",
"--reserve-memory=0M",
"--overprovisioned",
"--lock-memory=false",
"--default-log-level=error",
"--logger-log-level=kafka=info:request_auth=debug:security=debug",
},
ExposedPorts: []string{
"9092/tcp",
},
PortBindings: map[docker.Port][]docker.PortBinding{
"9092/tcp": {{HostIP: "localhost", HostPort: strconv.Itoa(hostPort)}},
"9092/tcp": {
{HostIP: "::1", HostPort: strconv.Itoa(port)},
{HostIP: "127.0.0.1", HostPort: strconv.Itoa(port)},
},
},
Mounts: []string{
fmt.Sprintf("%s:/certs", filepath.Join(testDataAbsPath, "certs")),
fmt.Sprintf("%s:/etc/redpanda", tempDir),
},
}, func(config *docker.HostConfig) {
}

var clientOpts []kgo.Opt
exposedPort := "9092/tcp"
if tlsConfig != nil {
clientOpts = append(clientOpts, kgo.DialTLSConfig(tlsConfig))
}

resource, err := pool.RunWithOptions(runOpts, func(config *docker.HostConfig) {
config.AutoRemove = true
})

require.NoError(t, err, "Failed to start container")

t.Cleanup(func() {
_ = pool.Purge(resource)
})

brokerDSN := fmt.Sprintf("localhost:%d", hostPort)
client, err := kgo.NewClient(kgo.SeedBrokers(brokerDSN))
brokerAddr := net.JoinHostPort("localhost", resource.GetPort(exposedPort))
clientOpts = append(clientOpts, kgo.SeedBrokers(brokerAddr))

if _, ok := os.LookupEnv("CERBOS_DEBUG_KAFKA"); ok {
ctx, cancelFunc := context.WithCancel(context.Background())
go func() {
if err := pool.Client.Logs(docker.LogsOptions{
Context: ctx,
Container: resource.Container.ID,
OutputStream: os.Stdout,
ErrorStream: os.Stderr,
Stdout: true,
Stderr: true,
Follow: true,
}); err != nil {
cancelFunc()
}
}()
t.Cleanup(cancelFunc)
}

client, err := kgo.NewClient(clientOpts...)
require.NoError(t, err)

require.NoError(t, pool.Retry(func() error {
return client.Ping(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

if err := client.Ping(ctx); err != nil {
t.Logf("Ping failed: %v", err)
return err
}

return nil
}), "Failed to connect to Kafka")

// create topic
_, err = kadm.NewClient(client).CreateTopic(context.Background(), 1, 1, nil, topic)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
t.Cleanup(cancel)
_, err = kadm.NewClient(client).CreateTopic(ctx, 1, 1, nil, topic)
require.NoError(t, err, "Failed to create Kafka topic")

return brokerDSN
return brokerAddr
}

func fetchKafkaTopic(t *testing.T, uri string, topic string, tlsEnabled bool) ([]*kgo.Record, error) {
Expand All @@ -331,7 +347,7 @@ func fetchKafkaTopic(t *testing.T, uri string, topic string, tlsEnabled bool) ([
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

tlsConfig, err := kafka.NewTLSConfig(ctx, duration, skipVerify, "testdata/valid/ca.crt", "testdata/valid/client/tls.crt", "testdata/valid/client/tls.key")
tlsConfig, err := kafka.NewTLSConfig(ctx, duration, skipVerify, "testdata/valid/certs/ca.crt", "testdata/valid/client/tls.crt", "testdata/valid/client/tls.key")
if err != nil {
return nil, err
}
Expand Down
File renamed without changes.
25 changes: 25 additions & 0 deletions internal/audit/kafka/testdata/valid/certs/rpk.yaml
@@ -0,0 +1,25 @@
redpanda:
data_directory: /var/lib/redpanda/data
seed_servers: []
kafka_api:
- address: 0.0.0.0
port: 9092
kafka_api_tls:
- key_file: /var/lib/redpanda/.config/rpk/tls.key
cert_file: /var/lib/redpanda/.config/rpk/tls.crt
truststore_file: /var/lib/redpanda/.config/rpk/ca.crt
enabled: true
require_client_auth: true
advertised_kafka_api:
- address: localhost
port: 65136
developer_mode: true
auto_create_topics_enabled: true
fetch_reads_debounce_timeout: 10
group_initial_rebalance_delay: 0
group_topic_partitions: 3
log_segment_size_min: 1
storage_min_free_bytes: 10485760
topic_partitions_per_shard: 1000
rpk:
overprovisioned: true
File renamed without changes.
File renamed without changes.
28 changes: 0 additions & 28 deletions internal/audit/kafka/testdata/valid/redpanda.yaml

This file was deleted.

30 changes: 30 additions & 0 deletions internal/audit/kafka/testdata/valid/redpanda/redpanda.yaml.gotmpl
@@ -0,0 +1,30 @@
redpanda:
data_directory: /var/lib/redpanda/data
kafka_api:
- name: api_listener
address: 0.0.0.0
port: 9092
{{- if .TLSEnabled }}
kafka_api_tls:
- name: api_listener
key_file: /certs/tls.key
cert_file: /certs/tls.crt
truststore_file: /certs/ca.crt
enabled: true
require_client_auth: true
{{- end }}
advertised_kafka_api:
- name: api_listener
address: localhost
port: {{ .Port }}
developer_mode: true
auto_create_topics_enabled: true
fetch_reads_debounce_timeout: 10
group_initial_rebalance_delay: 0
group_topic_partitions: 3
log_segment_size_min: 1
storage_min_free_bytes: 10485760
topic_partitions_per_shard: 1000
rpk:
coredump_dir: /var/lib/redpanda/coredump
overprovisioned: true

0 comments on commit 8f52e1e

Please sign in to comment.