Skip to content

Commit

Permalink
fix: Kafka async produce context cancellation (#1516)
Browse files Browse the repository at this point in the history
* Kafka integration to highlight async produce bug

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Fix async produce by removing ctx cancel

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Keep the linter happy

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Drop produce timeout now no longer required

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Migrate to dockertest

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

Fix lint issues

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Add integration tag

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Use helper for free port

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

---------

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>
  • Loading branch information
rcrowe committed Apr 5, 2023
1 parent 5831556 commit f40a8a8
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 21 deletions.
2 changes: 0 additions & 2 deletions docs/modules/configuration/pages/audit.adoc
Expand Up @@ -36,7 +36,6 @@ audit:
encoding: json # Encoding format. Valid values are "json" (default) or "protobuf".
maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode.
produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance.
produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up.
topic: cerbos.audit.log # Required. Topic to write audit entries to.
----

Expand Down Expand Up @@ -143,6 +142,5 @@ audit:
encoding: json # Encoding format. Valid values are "json" (default) or "protobuf".
maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode.
produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance.
produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up.
topic: cerbos.audit.log # Required. Topic to write audit entries to.
----
1 change: 0 additions & 1 deletion docs/modules/configuration/partials/fullconfiguration.adoc
Expand Up @@ -21,7 +21,6 @@ audit:
encoding: json # Encoding format. Valid values are "json" (default) or "protobuf".
maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode.
produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance.
produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up.
topic: cerbos.audit.log # Required. Topic to write audit entries to.
local:
advanced:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -67,6 +67,7 @@ require (
github.com/tidwall/pretty v1.2.1
github.com/tidwall/sjson v1.2.5
github.com/twmb/franz-go v1.13.2
github.com/twmb/franz-go/pkg/kadm v1.8.0
github.com/twmb/franz-go/plugin/kzap v1.1.2
go.elastic.co/ecszap v1.0.1
go.opencensus.io v0.24.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -2209,6 +2209,8 @@ github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqri
github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM=
github.com/twmb/franz-go v1.13.2 h1:jIdDoFiq8uP3Zrx6TZZTXpaXrv3bh1w3tV5mn/B+Gw8=
github.com/twmb/franz-go v1.13.2/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk=
github.com/twmb/franz-go/pkg/kadm v1.8.0 h1:vvKwZpxYn+VmM32v4mKkecHLKavZW+HcYLRKKxly5ZY=
github.com/twmb/franz-go/pkg/kadm v1.8.0/go.mod h1:qUSM7pxoMCU1UNu5H4USE64ODcVmeG9LS96mysv1nu8=
github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s=
github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/plugin/kzap v1.1.2 h1:0arX5xJ0soUPX1LlDay6ZZoxuWkWk1lggQ5M/IgRXAE=
Expand Down
8 changes: 0 additions & 8 deletions internal/audit/kafka/conf.go
Expand Up @@ -19,7 +19,6 @@ const (
defaultAcknowledgement = AckAll
defaultEncoding = EncodingJSON
defaultCloseTimeout = 30 * time.Second
defaultProduceTimeout = 5 * time.Second
defaultClientID = "cerbos"
defaultMaxBufferedRecords = 250
)
Expand All @@ -42,8 +41,6 @@ type Conf struct {
MaxBufferedRecords int `yaml:"maxBufferedRecords" conf:",example=1000"`
// ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance.
ProduceSync bool `yaml:"produceSync" conf:",example=false"`
// ProduceTimeout sets how long to attempt to publish a message before giving up.
ProduceTimeout time.Duration `yaml:"produceTimeout" conf:",example=5s"`
}

func (c *Conf) Key() string {
Expand All @@ -54,7 +51,6 @@ func (c *Conf) SetDefaults() {
c.Ack = defaultAcknowledgement
c.Encoding = defaultEncoding
c.CloseTimeout = defaultCloseTimeout
c.ProduceTimeout = defaultProduceTimeout
c.ClientID = defaultClientID
c.MaxBufferedRecords = defaultMaxBufferedRecords
}
Expand All @@ -78,10 +74,6 @@ func (c *Conf) Validate() error {
return errors.New("invalid close timeout")
}

if c.ProduceTimeout <= 0 {
return errors.New("invalid produce timeout")
}

if strings.TrimSpace(c.ClientID) == "" {
return errors.New("invalid client ID")
}
Expand Down
188 changes: 188 additions & 0 deletions internal/audit/kafka/kafka_test.go
@@ -0,0 +1,188 @@
// Copyright 2021-2023 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0

//go:build integration
// +build integration

package kafka_test

import (
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

auditv1 "github.com/cerbos/cerbos/api/genpb/cerbos/audit/v1"
"github.com/cerbos/cerbos/internal/audit"
_ "github.com/cerbos/cerbos/internal/audit/kafka"
"github.com/cerbos/cerbos/internal/config"
"github.com/cerbos/cerbos/internal/util"
)

const (
redpandaImage = "redpandadata/redpanda"
redpandaVersion = "v23.1.5"

defaultIntegrationTopic = "cerbos"
)

func TestSyncProduce(t *testing.T) {
t.Parallel()

ctx := context.Background()

// setup kafka
uri := newKafkaBroker(t, defaultIntegrationTopic)
log, err := newLog(map[string]any{
"audit": map[string]any{
"enabled": true,
"backend": "kafka",
"kafka": map[string]any{
"brokers": []string{uri},
"topic": defaultIntegrationTopic,
"produceSync": true,
},
},
})
require.NoError(t, err)

// write audit log entries
err = log.WriteAccessLogEntry(ctx, func() (*auditv1.AccessLogEntry, error) {
return &auditv1.AccessLogEntry{
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA1",
}, nil
})
require.NoError(t, err)

err = log.WriteDecisionLogEntry(ctx, func() (*auditv1.DecisionLogEntry, error) {
return &auditv1.DecisionLogEntry{
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA2",
}, nil
})
require.NoError(t, err)

// validate we see this entries in kafka
records, err := fetchKafkaTopic(uri, defaultIntegrationTopic)
require.NoError(t, err)
require.Len(t, records, 2, "unexpected number of published audit log entries")
}

func TestAsyncProduce(t *testing.T) {
t.Parallel()

ctx := context.Background()

// setup kafka
uri := newKafkaBroker(t, defaultIntegrationTopic)
log, err := newLog(map[string]any{
"audit": map[string]any{
"enabled": true,
"backend": "kafka",
"kafka": map[string]any{
"brokers": []string{uri},
"topic": defaultIntegrationTopic,
"produceSync": false,
},
},
})
require.NoError(t, err)

// write audit log entries
err = log.WriteAccessLogEntry(ctx, func() (*auditv1.AccessLogEntry, error) {
return &auditv1.AccessLogEntry{
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA1",
}, nil
})
require.NoError(t, err)

err = log.WriteDecisionLogEntry(ctx, func() (*auditv1.DecisionLogEntry, error) {
return &auditv1.DecisionLogEntry{
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FA2",
}, nil
})
require.NoError(t, err)

// validate we see this entries in kafka, eventually
require.Eventually(t, func() bool {
records, err := fetchKafkaTopic(uri, defaultIntegrationTopic)
require.NoError(t, err)
return len(records) == 2
}, 10*time.Second, 100*time.Millisecond, "expected to see audit log entries in kafka")
}

func newKafkaBroker(t *testing.T, topic string) string {
t.Helper()

hostPort, err := util.GetFreePort()
require.NoError(t, err, "Unable to get free port")

pool, err := dockertest.NewPool("")
require.NoError(t, err, "Failed to connect to Docker")

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: redpandaImage,
Tag: redpandaVersion,
Cmd: []string{
"redpanda",
"start",
"--mode", "dev-container",
// kafka admin client will retrieve the advertised address from the broker
// so we need it to use the same port that is exposed on the container
"--advertise-kafka-addr", fmt.Sprintf("localhost:%d", hostPort),
},
ExposedPorts: []string{
"9092/tcp",
},
PortBindings: map[docker.Port][]docker.PortBinding{
"9092/tcp": {{HostIP: "localhost", HostPort: strconv.Itoa(hostPort)}},
},
}, func(config *docker.HostConfig) {
config.AutoRemove = true
})
require.NoError(t, err, "Failed to start container")

t.Cleanup(func() {
_ = pool.Purge(resource)
})

brokerDSN := fmt.Sprintf("localhost:%d", hostPort)
client, err := kgo.NewClient(kgo.SeedBrokers(brokerDSN))
require.NoError(t, err)

require.NoError(t, pool.Retry(func() error {
return client.Ping(context.Background())
}), "Failed to connect to Kafka")

// create topic
_, err = kadm.NewClient(client).CreateTopic(context.Background(), 1, 1, nil, topic)
require.NoError(t, err, "Failed to create Kafka topic")

return brokerDSN
}

func fetchKafkaTopic(uri, topic string) ([]*kgo.Record, error) {
client, err := kgo.NewClient(kgo.SeedBrokers(uri))
if err != nil {
return nil, err
}

client.AddConsumeTopics(topic)

fetches := client.PollFetches(context.Background())
return fetches.Records(), fetches.Err()
}

func newLog(m map[string]any) (audit.Log, error) {
cfg, err := config.WrapperFromMap(m)
if err != nil {
return nil, err
}
return audit.NewLogFromConf(context.Background(), cfg)
}
12 changes: 2 additions & 10 deletions internal/audit/kafka/publisher.go
Expand Up @@ -73,7 +73,6 @@ type Publisher struct {
marshaller recordMarshaller
sync bool
closeTimeout time.Duration
produceTimeout time.Duration
}

func NewPublisher(conf *Conf, decisionFilter audit.DecisionLogEntryFilter) (*Publisher, error) {
Expand Down Expand Up @@ -110,7 +109,6 @@ func NewPublisher(conf *Conf, decisionFilter audit.DecisionLogEntryFilter) (*Pub
marshaller: newMarshaller(conf.Encoding),
sync: conf.ProduceSync,
closeTimeout: conf.CloseTimeout,
produceTimeout: conf.ProduceTimeout,
}, nil
}

Expand Down Expand Up @@ -170,18 +168,12 @@ func (p *Publisher) WriteDecisionLogEntry(ctx context.Context, record audit.Deci

func (p *Publisher) write(ctx context.Context, msg *kgo.Record) error {
if p.sync {
produceCtx, produceCancel := context.WithTimeout(ctx, p.produceTimeout)
defer produceCancel()

return p.Client.ProduceSync(produceCtx, msg).FirstErr()
return p.Client.ProduceSync(ctx, msg).FirstErr()
}

// detach the context from the caller so the request can return
// without cancelling any async kafka operations
produceCtx, produceCancel := context.WithTimeout(context.Background(), p.produceTimeout)
defer produceCancel()

p.Client.Produce(produceCtx, msg, func(r *kgo.Record, err error) {
p.Client.Produce(context.Background(), msg, func(r *kgo.Record, err error) {
if err == nil {
return
}
Expand Down

0 comments on commit f40a8a8

Please sign in to comment.