From f40a8a8c3b76463491afdbe1bb841750f71ef63f Mon Sep 17 00:00:00 2001 From: Rob Crowe <282571+rcrowe@users.noreply.github.com> Date: Wed, 5 Apr 2023 13:19:28 +0100 Subject: [PATCH] fix: Kafka async produce context cancellation (#1516) * Kafka integration to highlight async produce bug Signed-off-by: Rob Crowe * Fix async produce by removing ctx cancel Signed-off-by: Rob Crowe * Keep the linter happy Signed-off-by: Rob Crowe * Drop produce timeout now no longer required Signed-off-by: Rob Crowe * Migrate to dockertest Signed-off-by: Rob Crowe Fix lint issues Signed-off-by: Rob Crowe * Add integration tag Signed-off-by: Rob Crowe * Use helper for free port Signed-off-by: Rob Crowe --------- Signed-off-by: Rob Crowe --- docs/modules/configuration/pages/audit.adoc | 2 - .../partials/fullconfiguration.adoc | 1 - go.mod | 1 + go.sum | 2 + internal/audit/kafka/conf.go | 8 - internal/audit/kafka/kafka_test.go | 188 ++++++++++++++++++ internal/audit/kafka/publisher.go | 12 +- 7 files changed, 193 insertions(+), 21 deletions(-) create mode 100644 internal/audit/kafka/kafka_test.go diff --git a/docs/modules/configuration/pages/audit.adoc b/docs/modules/configuration/pages/audit.adoc index 64715df2a..bb0c48400 100644 --- a/docs/modules/configuration/pages/audit.adoc +++ b/docs/modules/configuration/pages/audit.adoc @@ -36,7 +36,6 @@ audit: encoding: json # Encoding format. Valid values are "json" (default) or "protobuf". maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode. produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance. - produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up. topic: cerbos.audit.log # Required. Topic to write audit entries to. ---- @@ -143,6 +142,5 @@ audit: encoding: json # Encoding format. Valid values are "json" (default) or "protobuf". maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode. produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance. - produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up. topic: cerbos.audit.log # Required. Topic to write audit entries to. ---- diff --git a/docs/modules/configuration/partials/fullconfiguration.adoc b/docs/modules/configuration/partials/fullconfiguration.adoc index 2cd320dc9..e208a3772 100644 --- a/docs/modules/configuration/partials/fullconfiguration.adoc +++ b/docs/modules/configuration/partials/fullconfiguration.adoc @@ -21,7 +21,6 @@ audit: encoding: json # Encoding format. Valid values are "json" (default) or "protobuf". maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode. produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance. - produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up. topic: cerbos.audit.log # Required. Topic to write audit entries to. local: advanced: diff --git a/go.mod b/go.mod index 7b790b366..17fd8d380 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/tidwall/pretty v1.2.1 github.com/tidwall/sjson v1.2.5 github.com/twmb/franz-go v1.13.2 + github.com/twmb/franz-go/pkg/kadm v1.8.0 github.com/twmb/franz-go/plugin/kzap v1.1.2 go.elastic.co/ecszap v1.0.1 go.opencensus.io v0.24.0 diff --git a/go.sum b/go.sum index ee2b9d6c2..1fa5c6c15 100644 --- a/go.sum +++ b/go.sum @@ -2209,6 +2209,8 @@ github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqri github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM= github.com/twmb/franz-go v1.13.2 h1:jIdDoFiq8uP3Zrx6TZZTXpaXrv3bh1w3tV5mn/B+Gw8= github.com/twmb/franz-go v1.13.2/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk= +github.com/twmb/franz-go/pkg/kadm v1.8.0 h1:vvKwZpxYn+VmM32v4mKkecHLKavZW+HcYLRKKxly5ZY= +github.com/twmb/franz-go/pkg/kadm v1.8.0/go.mod h1:qUSM7pxoMCU1UNu5H4USE64ODcVmeG9LS96mysv1nu8= github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s= github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/franz-go/plugin/kzap v1.1.2 h1:0arX5xJ0soUPX1LlDay6ZZoxuWkWk1lggQ5M/IgRXAE= diff --git a/internal/audit/kafka/conf.go b/internal/audit/kafka/conf.go index 66148275f..f6f36bdf0 100644 --- a/internal/audit/kafka/conf.go +++ b/internal/audit/kafka/conf.go @@ -19,7 +19,6 @@ const ( defaultAcknowledgement = AckAll defaultEncoding = EncodingJSON defaultCloseTimeout = 30 * time.Second - defaultProduceTimeout = 5 * time.Second defaultClientID = "cerbos" defaultMaxBufferedRecords = 250 ) @@ -42,8 +41,6 @@ type Conf struct { MaxBufferedRecords int `yaml:"maxBufferedRecords" conf:",example=1000"` // ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance. ProduceSync bool `yaml:"produceSync" conf:",example=false"` - // ProduceTimeout sets how long to attempt to publish a message before giving up. - ProduceTimeout time.Duration `yaml:"produceTimeout" conf:",example=5s"` } func (c *Conf) Key() string { @@ -54,7 +51,6 @@ func (c *Conf) SetDefaults() { c.Ack = defaultAcknowledgement c.Encoding = defaultEncoding c.CloseTimeout = defaultCloseTimeout - c.ProduceTimeout = defaultProduceTimeout c.ClientID = defaultClientID c.MaxBufferedRecords = defaultMaxBufferedRecords } @@ -78,10 +74,6 @@ func (c *Conf) Validate() error { return errors.New("invalid close timeout") } - if c.ProduceTimeout <= 0 { - return errors.New("invalid produce timeout") - } - if strings.TrimSpace(c.ClientID) == "" { return errors.New("invalid client ID") } diff --git a/internal/audit/kafka/kafka_test.go b/internal/audit/kafka/kafka_test.go new file mode 100644 index 000000000..52678a350 --- /dev/null +++ b/internal/audit/kafka/kafka_test.go @@ -0,0 +1,188 @@ +// Copyright 2021-2023 Zenauth Ltd. +// SPDX-License-Identifier: Apache-2.0 + +//go:build integration +// +build integration + +package kafka_test + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" + + auditv1 "github.com/cerbos/cerbos/api/genpb/cerbos/audit/v1" + "github.com/cerbos/cerbos/internal/audit" + _ "github.com/cerbos/cerbos/internal/audit/kafka" + "github.com/cerbos/cerbos/internal/config" + "github.com/cerbos/cerbos/internal/util" +) + +const ( + redpandaImage = "redpandadata/redpanda" + redpandaVersion = "v23.1.5" + + defaultIntegrationTopic = "cerbos" +) + +func TestSyncProduce(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // setup kafka + uri := newKafkaBroker(t, defaultIntegrationTopic) + log, err := newLog(map[string]any{ + "audit": map[string]any{ + "enabled": true, + "backend": "kafka", + "kafka": map[string]any{ + "brokers": []string{uri}, + "topic": defaultIntegrationTopic, + "produceSync": true, + }, + }, + }) + require.NoError(t, err) + + // write audit log entries + err = log.WriteAccessLogEntry(ctx, func() (*auditv1.AccessLogEntry, error) { + return &auditv1.AccessLogEntry{ + CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA1", + }, nil + }) + require.NoError(t, err) + + err = log.WriteDecisionLogEntry(ctx, func() (*auditv1.DecisionLogEntry, error) { + return &auditv1.DecisionLogEntry{ + CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA2", + }, nil + }) + require.NoError(t, err) + + // validate we see this entries in kafka + records, err := fetchKafkaTopic(uri, defaultIntegrationTopic) + require.NoError(t, err) + require.Len(t, records, 2, "unexpected number of published audit log entries") +} + +func TestAsyncProduce(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // setup kafka + uri := newKafkaBroker(t, defaultIntegrationTopic) + log, err := newLog(map[string]any{ + "audit": map[string]any{ + "enabled": true, + "backend": "kafka", + "kafka": map[string]any{ + "brokers": []string{uri}, + "topic": defaultIntegrationTopic, + "produceSync": false, + }, + }, + }) + require.NoError(t, err) + + // write audit log entries + err = log.WriteAccessLogEntry(ctx, func() (*auditv1.AccessLogEntry, error) { + return &auditv1.AccessLogEntry{ + CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA1", + }, nil + }) + require.NoError(t, err) + + err = log.WriteDecisionLogEntry(ctx, func() (*auditv1.DecisionLogEntry, error) { + return &auditv1.DecisionLogEntry{ + CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA2", + }, nil + }) + require.NoError(t, err) + + // validate we see this entries in kafka, eventually + require.Eventually(t, func() bool { + records, err := fetchKafkaTopic(uri, defaultIntegrationTopic) + require.NoError(t, err) + return len(records) == 2 + }, 10*time.Second, 100*time.Millisecond, "expected to see audit log entries in kafka") +} + +func newKafkaBroker(t *testing.T, topic string) string { + t.Helper() + + hostPort, err := util.GetFreePort() + require.NoError(t, err, "Unable to get free port") + + pool, err := dockertest.NewPool("") + require.NoError(t, err, "Failed to connect to Docker") + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: redpandaImage, + Tag: redpandaVersion, + Cmd: []string{ + "redpanda", + "start", + "--mode", "dev-container", + // kafka admin client will retrieve the advertised address from the broker + // so we need it to use the same port that is exposed on the container + "--advertise-kafka-addr", fmt.Sprintf("localhost:%d", hostPort), + }, + ExposedPorts: []string{ + "9092/tcp", + }, + PortBindings: map[docker.Port][]docker.PortBinding{ + "9092/tcp": {{HostIP: "localhost", HostPort: strconv.Itoa(hostPort)}}, + }, + }, func(config *docker.HostConfig) { + config.AutoRemove = true + }) + require.NoError(t, err, "Failed to start container") + + t.Cleanup(func() { + _ = pool.Purge(resource) + }) + + brokerDSN := fmt.Sprintf("localhost:%d", hostPort) + client, err := kgo.NewClient(kgo.SeedBrokers(brokerDSN)) + require.NoError(t, err) + + require.NoError(t, pool.Retry(func() error { + return client.Ping(context.Background()) + }), "Failed to connect to Kafka") + + // create topic + _, err = kadm.NewClient(client).CreateTopic(context.Background(), 1, 1, nil, topic) + require.NoError(t, err, "Failed to create Kafka topic") + + return brokerDSN +} + +func fetchKafkaTopic(uri, topic string) ([]*kgo.Record, error) { + client, err := kgo.NewClient(kgo.SeedBrokers(uri)) + if err != nil { + return nil, err + } + + client.AddConsumeTopics(topic) + + fetches := client.PollFetches(context.Background()) + return fetches.Records(), fetches.Err() +} + +func newLog(m map[string]any) (audit.Log, error) { + cfg, err := config.WrapperFromMap(m) + if err != nil { + return nil, err + } + return audit.NewLogFromConf(context.Background(), cfg) +} diff --git a/internal/audit/kafka/publisher.go b/internal/audit/kafka/publisher.go index daad12104..bed8bd891 100644 --- a/internal/audit/kafka/publisher.go +++ b/internal/audit/kafka/publisher.go @@ -73,7 +73,6 @@ type Publisher struct { marshaller recordMarshaller sync bool closeTimeout time.Duration - produceTimeout time.Duration } func NewPublisher(conf *Conf, decisionFilter audit.DecisionLogEntryFilter) (*Publisher, error) { @@ -110,7 +109,6 @@ func NewPublisher(conf *Conf, decisionFilter audit.DecisionLogEntryFilter) (*Pub marshaller: newMarshaller(conf.Encoding), sync: conf.ProduceSync, closeTimeout: conf.CloseTimeout, - produceTimeout: conf.ProduceTimeout, }, nil } @@ -170,18 +168,12 @@ func (p *Publisher) WriteDecisionLogEntry(ctx context.Context, record audit.Deci func (p *Publisher) write(ctx context.Context, msg *kgo.Record) error { if p.sync { - produceCtx, produceCancel := context.WithTimeout(ctx, p.produceTimeout) - defer produceCancel() - - return p.Client.ProduceSync(produceCtx, msg).FirstErr() + return p.Client.ProduceSync(ctx, msg).FirstErr() } // detach the context from the caller so the request can return // without cancelling any async kafka operations - produceCtx, produceCancel := context.WithTimeout(context.Background(), p.produceTimeout) - defer produceCancel() - - p.Client.Produce(produceCtx, msg, func(r *kgo.Record, err error) { + p.Client.Produce(context.Background(), msg, func(r *kgo.Record, err error) { if err == nil { return }