From 4a440e925ec99c247cdb3a854932e14f44652994 Mon Sep 17 00:00:00 2001 From: Rob Crowe <282571+rcrowe@users.noreply.github.com> Date: Mon, 27 Mar 2023 14:56:35 +0100 Subject: [PATCH] feat: Kafka audit log (#1499) * Kafka audit log backend Signed-off-by: Rob Crowe * Support JSON & Proto encoding Signed-off-by: Rob Crowe * Async publishing Signed-off-by: Rob Crowe * Generate missing configuration Signed-off-by: Rob Crowe * Fix broker config example Signed-off-by: Rob Crowe * Include audi kind as header Signed-off-by: Rob Crowe * Test helpers use correct line nums Signed-off-by: Rob Crowe * Fix field alignment Signed-off-by: Rob Crowe * Default to async publishing Signed-off-by: Rob Crowe * Flush buffered messages Signed-off-by: Rob Crowe Ignore error for now Signed-off-by: Rob Crowe * Enable debug logger on kafka client Signed-off-by: Rob Crowe * Configure client ID Signed-off-by: Rob Crowe * Configure max buffered Signed-off-by: Rob Crowe * Configure required acks Signed-off-by: Rob Crowe Disable idempotency when not ack all Signed-off-by: Rob Crowe * Drop 2nd item to fix docs issue Signed-off-by: Rob Crowe * YAML parsing converts to duration Signed-off-by: Rob Crowe * Remove header allocations Signed-off-by: Rob Crowe * Refactor marshalling & various cleanup Signed-off-by: Rob Crowe Regen docs Signed-off-by: Rob Crowe * Return error when flushing on close Signed-off-by: Rob Crowe * Not required, formatAck catches invalid Signed-off-by: Rob Crowe * Drop type switching and embed proto.Message Signed-off-by: Rob Crowe --------- Signed-off-by: Rob Crowe --- .../partials/fullconfiguration.adoc | 9 + go.mod | 6 +- go.sum | 12 +- internal/audit/kafka/conf.go | 99 ++++++++ internal/audit/kafka/marshal_test.go | 53 ++++ internal/audit/kafka/publisher.go | 231 ++++++++++++++++++ internal/audit/kafka/publisher_test.go | 200 +++++++++++++++ internal/server/server.go | 2 + 8 files changed, 609 insertions(+), 3 deletions(-) create mode 100644 internal/audit/kafka/conf.go create mode 100644 internal/audit/kafka/marshal_test.go create mode 100644 internal/audit/kafka/publisher.go create mode 100644 internal/audit/kafka/publisher_test.go diff --git a/docs/modules/configuration/partials/fullconfiguration.adoc b/docs/modules/configuration/partials/fullconfiguration.adoc index 993f9c847..6b1911ffb 100644 --- a/docs/modules/configuration/partials/fullconfiguration.adoc +++ b/docs/modules/configuration/partials/fullconfiguration.adoc @@ -13,6 +13,15 @@ audit: includeMetadataKeys: ['content-type'] # IncludeMetadataKeys defines which gRPC request metadata keys should be included in the audit logs. file: path: /path/to/file.log # Path to the log file to use as output. The special values stdout and stderr can be used to write to stdout or stderr respectively. + kafka: + ack: all # Required acknowledgement for messages, accepts none, leader or the default all. Idempotency disabled when not all + brokers: ['localhost:9092'] # Seed brokers Kafka client will connect to + clientID: cerbos # Identifier sent with all requests to Kafka + encoding: protobuf # Data format written to Kafka, accepts either json (default) or protobuf + flushTimeout: 30s # Timeout for flushing messages to Kafka + maxBufferedLogs: 1000 # MaxBufferedLogs sets the max amount of logs the client will buffer before blocking + produceSync: true # Increase reliability by stopping asynchronous publishing at the cost of reduced performance + topic: cerbos.audit.log # Name of the topic audit entries are written to local: advanced: bufferSize: 256 diff --git a/go.mod b/go.mod index 841d3c873..5f0c79576 100644 --- a/go.mod +++ b/go.mod @@ -66,6 +66,8 @@ require ( github.com/tidwall/gjson v1.14.4 github.com/tidwall/pretty v1.2.1 github.com/tidwall/sjson v1.2.5 + github.com/twmb/franz-go v1.13.0 + github.com/twmb/franz-go/plugin/kzap v1.1.2 go.elastic.co/ecszap v1.0.1 go.opencensus.io v0.24.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0 @@ -182,7 +184,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/compress v1.15.15 // indirect + github.com/klauspost/compress v1.16.3 // indirect github.com/klauspost/cpuid/v2 v2.2.3 // indirect github.com/lestrrat-go/blackmagic v1.0.1 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect @@ -208,6 +210,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc2 // indirect github.com/opencontainers/runc v1.1.2 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -227,6 +230,7 @@ require ( github.com/stoewer/go-strcase v1.2.0 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/tidwall/match v1.1.1 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect diff --git a/go.sum b/go.sum index 1fd3a9868..1452ec606 100644 --- a/go.sum +++ b/go.sum @@ -1626,8 +1626,8 @@ github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8 github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= -github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= +github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= +github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -1943,6 +1943,8 @@ github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4= github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFzPPsI= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= @@ -2205,6 +2207,12 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM= +github.com/twmb/franz-go v1.13.0 h1:J4VyTXVlOhiCDCXS56ut2ZRAylaimPXnIqtCq9Wlfbw= +github.com/twmb/franz-go v1.13.0/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk= +github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s= +github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/franz-go/plugin/kzap v1.1.2 h1:0arX5xJ0soUPX1LlDay6ZZoxuWkWk1lggQ5M/IgRXAE= +github.com/twmb/franz-go/plugin/kzap v1.1.2/go.mod h1:53Cl9Uz1pbdOPDvUISIxLrZIWSa2jCuY1bTMauRMBmo= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= diff --git a/internal/audit/kafka/conf.go b/internal/audit/kafka/conf.go new file mode 100644 index 000000000..f84c6ccc5 --- /dev/null +++ b/internal/audit/kafka/conf.go @@ -0,0 +1,99 @@ +// Copyright 2021-2023 Zenauth Ltd. +// SPDX-License-Identifier: Apache-2.0 + +package kafka + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/cerbos/cerbos/internal/audit" + "github.com/twmb/franz-go/pkg/kgo" +) + +const confKey = audit.ConfKey + ".kafka" + +const ( + defaultAcknowledgement = AckAll + defaultEncoding = EncodingJSON + defaultFlushTimeout = 30 * time.Second + defaultClientID = "cerbos" + defaultMaxBufferedLogs = 250 +) + +// Conf is optional configuration for kafka Audit. +type Conf struct { + // Required acknowledgement for messages, accepts none, leader or the default all. Idempotency disabled when not all + Ack string `yaml:"ack" conf:",example=all"` + // Name of the topic audit entries are written to + Topic string `yaml:"topic" conf:",example=cerbos.audit.log"` + // Data format written to Kafka, accepts either json (default) or protobuf + Encoding Encoding `yaml:"encoding" conf:",example=protobuf"` + // Identifier sent with all requests to Kafka + ClientID string `yaml:"clientID" conf:",example=cerbos"` + // Seed brokers Kafka client will connect to + Brokers []string `yaml:"brokers" conf:",example=['localhost:9092']"` + // Timeout for flushing messages to Kafka + FlushTimeout time.Duration `yaml:"flushTimeout" conf:",example=30s"` + // MaxBufferedLogs sets the max amount of logs the client will buffer before blocking + MaxBufferedLogs int `yaml:"maxBufferedLogs" conf:",example=1000"` + // Increase reliability by stopping asynchronous publishing at the cost of reduced performance + ProduceSync bool `yaml:"produceSync" conf:",example=true"` +} + +func (c *Conf) Key() string { + return confKey +} + +func (c *Conf) SetDefaults() { + c.Ack = defaultAcknowledgement + c.Encoding = defaultEncoding + c.FlushTimeout = defaultFlushTimeout + c.ClientID = defaultClientID + c.MaxBufferedLogs = defaultMaxBufferedLogs +} + +func (c *Conf) Validate() error { + if _, err := formatAck(c.Ack); err != nil { + return err + } + + if strings.TrimSpace(c.Topic) == "" { + return errors.New("invalid topic") + } + + switch c.Encoding { + case EncodingJSON, EncodingProtobuf: + default: + return fmt.Errorf("invalid encoding format: %s", c.Encoding) + } + + if c.FlushTimeout <= 0 { + return errors.New("invalid flush timeout") + } + + if strings.TrimSpace(c.ClientID) == "" { + return errors.New("invalid client ID") + } + + if len(c.Brokers) == 0 { + return errors.New("empty brokers") + } + + return nil +} + +func formatAck(ack string) (kgo.Acks, error) { + switch ack { + case AckNone: + return kgo.NoAck(), nil + case AckAll: + return kgo.AllISRAcks(), nil + case AckLeader: + return kgo.LeaderAck(), nil + default: + return kgo.NoAck(), fmt.Errorf("invalid ack value: %s", ack) + } +} diff --git a/internal/audit/kafka/marshal_test.go b/internal/audit/kafka/marshal_test.go new file mode 100644 index 000000000..aa74a64bf --- /dev/null +++ b/internal/audit/kafka/marshal_test.go @@ -0,0 +1,53 @@ +// Copyright 2021-2023 Zenauth Ltd. +// SPDX-License-Identifier: Apache-2.0 + +package kafka + +import ( + "fmt" + "testing" + + auditv1 "github.com/cerbos/cerbos/api/genpb/cerbos/audit/v1" +) + +var encoding = []Encoding{EncodingJSON, EncodingProtobuf} + +func BenchmarkRecordMarshaller_AccessLog(b *testing.B) { + for _, enc := range encoding { + b.Run(fmt.Sprintf("encoding_%s", enc), func(b *testing.B) { + m := newMarshaller(enc) + rec := &auditv1.AccessLogEntry{ + CallId: "01ARZ3NDEKTSV4RRFFQ69G5FAV", + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + if _, err := m.Marshal(rec, KindAccess); err != nil { + b.Fatal(err) + } + } + }) + } +} + +func BenchmarkRecordMarshaller_DecisionLog(b *testing.B) { + for _, enc := range encoding { + b.Run(fmt.Sprintf("encoding_%s", enc), func(b *testing.B) { + m := newMarshaller(enc) + rec := &auditv1.DecisionLogEntry{ + CallId: "01ARZ3NDEKTSV4RRFFQ69G5FAV", + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + if _, err := m.Marshal(rec, KindDecision); err != nil { + b.Fatal(err) + } + } + }) + } +} diff --git a/internal/audit/kafka/publisher.go b/internal/audit/kafka/publisher.go new file mode 100644 index 000000000..caffbb5f2 --- /dev/null +++ b/internal/audit/kafka/publisher.go @@ -0,0 +1,231 @@ +// Copyright 2021-2023 Zenauth Ltd. +// SPDX-License-Identifier: Apache-2.0 + +package kafka + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/plugin/kzap" + "go.uber.org/zap" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + "github.com/cerbos/cerbos/internal/audit" + "github.com/cerbos/cerbos/internal/config" +) + +const Backend = "kafka" + +const ( + AckNone = "none" + AckAll = "all" + AckLeader = "leader" + + HeaderKeyEncoding = "cerbos.audit.encoding" + HeaderKeyKind = "cerbos.audit.kind" +) + +type Encoding string + +const ( + EncodingJSON Encoding = "json" + EncodingProtobuf Encoding = "protobuf" +) + +type Kind []byte + +var ( + KindAccess Kind = []byte("access") + KindDecision Kind = []byte("decision") +) + +func init() { + audit.RegisterBackend(Backend, func(ctx context.Context, confW *config.Wrapper, decisionFilter audit.DecisionLogEntryFilter) (audit.Log, error) { + conf := new(Conf) + if err := confW.GetSection(conf); err != nil { + return nil, fmt.Errorf("failed to read kafka audit log configuration: %w", err) + } + + return NewPublisher(conf, decisionFilter) + }) +} + +type Client interface { + Close() + Flush(context.Context) error + Produce(context.Context, *kgo.Record, func(*kgo.Record, error)) + ProduceSync(context.Context, ...*kgo.Record) kgo.ProduceResults +} + +type Publisher struct { + Client Client + decisionFilter audit.DecisionLogEntryFilter + marshaller recordMarshaller + sync bool + flushTimeout time.Duration +} + +func NewPublisher(conf *Conf, decisionFilter audit.DecisionLogEntryFilter) (*Publisher, error) { + clientOpts := []kgo.Opt{ + kgo.ClientID(conf.ClientID), + kgo.SeedBrokers(conf.Brokers...), + kgo.DefaultProduceTopic(conf.Topic), + kgo.MaxBufferedRecords(conf.MaxBufferedLogs), + } + + if _, ok := os.LookupEnv("CERBOS_DEBUG_KAFKA"); ok { + clientOpts = append(clientOpts, kgo.WithLogger( + kzap.New(zap.L().Named("kafka"), kzap.Level(kgo.LogLevelDebug)), + )) + } + + ack, err := formatAck(conf.Ack) + if err != nil { + return nil, err + } + clientOpts = append(clientOpts, kgo.RequiredAcks(ack)) + if conf.Ack != AckAll { + clientOpts = append(clientOpts, kgo.DisableIdempotentWrite()) + } + + client, err := kgo.NewClient(clientOpts...) + if err != nil { + return nil, err + } + + return &Publisher{ + Client: client, + decisionFilter: decisionFilter, + marshaller: newMarshaller(conf.Encoding), + sync: conf.ProduceSync, + flushTimeout: conf.FlushTimeout, + }, nil +} + +func (p *Publisher) Close() error { + flushCtx, flushCancel := context.WithTimeout(context.Background(), p.flushTimeout) + defer flushCancel() + if err := p.Client.Flush(flushCtx); err != nil { + return err + } + + p.Client.Close() + return nil +} + +func (p *Publisher) Backend() string { + return Backend +} + +func (p *Publisher) Enabled() bool { + return true +} + +func (p *Publisher) WriteAccessLogEntry(ctx context.Context, record audit.AccessLogEntryMaker) error { + rec, err := record() + if err != nil { + return err + } + + msg, err := p.marshaller.Marshal(rec, KindAccess) + if err != nil { + return err + } + + return p.write(ctx, msg) +} + +func (p *Publisher) WriteDecisionLogEntry(ctx context.Context, record audit.DecisionLogEntryMaker) error { + rec, err := record() + if err != nil { + return err + } + + if p.decisionFilter != nil { + rec = p.decisionFilter(rec) + if rec == nil { + return nil + } + } + + msg, err := p.marshaller.Marshal(rec, KindDecision) + if err != nil { + return err + } + + return p.write(ctx, msg) +} + +func (p *Publisher) write(ctx context.Context, msg *kgo.Record) error { + if p.sync { + return p.Client.ProduceSync(ctx, msg).FirstErr() + } + + p.Client.Produce(ctx, msg, func(r *kgo.Record, err error) { + if err != nil { + // TODO: Handle via interceptor + ctxzap.Extract(ctx).Warn("failed to write audit log entry", zap.Error(err)) + } + }) + return nil +} + +func newMarshaller(enc Encoding) recordMarshaller { + return recordMarshaller{ + encoding: enc, + encodingKey: []byte(enc), + } +} + +type recordMarshaller struct { + encoding Encoding + encodingKey []byte +} + +type auditEntry interface { + proto.Message + GetCallId() string + MarshalVT() ([]byte, error) +} + +func (m recordMarshaller) Marshal(entry auditEntry, kind Kind) (*kgo.Record, error) { + partitionKey, err := audit.ID(entry.GetCallId()).Repr() + if err != nil { + return nil, fmt.Errorf("invalid call ID: %w", err) + } + + var payload []byte + switch m.encoding { + default: + return nil, fmt.Errorf("invalid encoding format: %s", m.encoding) + case EncodingJSON: + payload, err = protojson.Marshal(entry) + case EncodingProtobuf: + payload, err = entry.MarshalVT() + } + + if err != nil { + return nil, fmt.Errorf("failed to marshal entry: %w", err) + } + + return &kgo.Record{ + Key: partitionKey.Bytes(), + Value: payload, + Headers: []kgo.RecordHeader{ + { + Key: HeaderKeyEncoding, + Value: m.encodingKey, + }, + { + Key: HeaderKeyKind, + Value: kind, + }, + }, + }, nil +} diff --git a/internal/audit/kafka/publisher_test.go b/internal/audit/kafka/publisher_test.go new file mode 100644 index 000000000..4e9900c01 --- /dev/null +++ b/internal/audit/kafka/publisher_test.go @@ -0,0 +1,200 @@ +// Copyright 2021-2023 Zenauth Ltd. +// SPDX-License-Identifier: Apache-2.0 + +package kafka_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" + "google.golang.org/protobuf/encoding/protojson" + + auditv1 "github.com/cerbos/cerbos/api/genpb/cerbos/audit/v1" + "github.com/cerbos/cerbos/internal/audit" + "github.com/cerbos/cerbos/internal/audit/kafka" +) + +var id = audit.ID("01ARZ3NDEKTSV4RRFFQ69G5FAV") + +func TestWriteAccessLogEntry(t *testing.T) { + t.Run("json encoded message", func(t *testing.T) { + publisher, kafkaClient := newPublisher(t, kafka.Conf{ + Encoding: kafka.EncodingJSON, + }) + + err := publisher.WriteAccessLogEntry(context.Background(), func() (*auditv1.AccessLogEntry, error) { + return &auditv1.AccessLogEntry{ + CallId: string(id), + }, nil + }) + require.NoError(t, err) + + expectPartitionKey(t, kafkaClient) + expectKind(t, kafkaClient, kafka.KindAccess) + expectJSON(t, kafkaClient) + }) + + t.Run("protobuf encoded message", func(t *testing.T) { + publisher, kafkaClient := newPublisher(t, kafka.Conf{ + Encoding: kafka.EncodingProtobuf, + }) + + err := publisher.WriteAccessLogEntry(context.Background(), func() (*auditv1.AccessLogEntry, error) { + return &auditv1.AccessLogEntry{ + CallId: string(id), + }, nil + }) + require.NoError(t, err) + + expectPartitionKey(t, kafkaClient) + expectKind(t, kafkaClient, kafka.KindAccess) + expectProtobuf(t, kafkaClient) + }) + + t.Run("sync message", func(t *testing.T) { + publisher, kafkaClient := newPublisher(t, kafka.Conf{ + Encoding: kafka.EncodingJSON, + ProduceSync: true, + }) + + err := publisher.WriteAccessLogEntry(context.Background(), func() (*auditv1.AccessLogEntry, error) { + return &auditv1.AccessLogEntry{ + CallId: string(id), + }, nil + }) + require.NoError(t, err) + + expectPartitionKey(t, kafkaClient) + expectKind(t, kafkaClient, kafka.KindAccess) + expectJSON(t, kafkaClient) + }) +} + +func TestWriteDecisionLogEntry(t *testing.T) { + t.Run("json encoded message", func(t *testing.T) { + publisher, kafkaClient := newPublisher(t, kafka.Conf{ + Encoding: kafka.EncodingJSON, + }) + + err := publisher.WriteDecisionLogEntry(context.Background(), func() (*auditv1.DecisionLogEntry, error) { + return &auditv1.DecisionLogEntry{ + CallId: string(id), + }, nil + }) + require.NoError(t, err) + + expectPartitionKey(t, kafkaClient) + expectKind(t, kafkaClient, kafka.KindDecision) + expectJSON(t, kafkaClient) + }) + + t.Run("protobuf encoded message", func(t *testing.T) { + publisher, kafkaClient := newPublisher(t, kafka.Conf{ + Encoding: kafka.EncodingProtobuf, + }) + + err := publisher.WriteDecisionLogEntry(context.Background(), func() (*auditv1.DecisionLogEntry, error) { + return &auditv1.DecisionLogEntry{ + CallId: string(id), + }, nil + }) + require.NoError(t, err) + + expectPartitionKey(t, kafkaClient) + expectKind(t, kafkaClient, kafka.KindDecision) + expectProtobuf(t, kafkaClient) + }) +} + +func expectPartitionKey(t *testing.T, kafkaClient *mockClient) { + t.Helper() + + expectedID, err := id.Repr() + require.NoError(t, err) + assert.Equal(t, expectedID.Bytes(), kafkaClient.Records[0].Key) +} + +func expectKind(t *testing.T, kafkaClient *mockClient, kind []byte) { + t.Helper() + + assert.Equal(t, kind, getHeader(kafkaClient.Records[0].Headers, kafka.HeaderKeyKind)) +} + +func expectJSON(t *testing.T, kafkaClient *mockClient) { + t.Helper() + + // expected encoding + assert.Equal(t, []byte(kafka.EncodingJSON), getHeader(kafkaClient.Records[0].Headers, kafka.HeaderKeyEncoding)) + + // decode json + var entry auditv1.AccessLogEntry + err := protojson.Unmarshal(kafkaClient.Records[0].Value, &entry) + require.NoError(t, err) + assert.Equal(t, entry.CallId, string(id)) +} + +func expectProtobuf(t *testing.T, kafkaClient *mockClient) { + t.Helper() + + // expected encoding + assert.Equal(t, []byte(kafka.EncodingProtobuf), getHeader(kafkaClient.Records[0].Headers, kafka.HeaderKeyEncoding)) + + // decode protobuf + var entry auditv1.AccessLogEntry + err := entry.UnmarshalVT(kafkaClient.Records[0].Value) + require.NoError(t, err) + assert.Equal(t, entry.CallId, string(id)) +} + +func newPublisher(t *testing.T, cfg kafka.Conf) (*kafka.Publisher, *mockClient) { + t.Helper() + + config := &kafka.Conf{} + config.SetDefaults() + config.Brokers = []string{"localhost:9092"} + config.Encoding = cfg.Encoding + config.ProduceSync = cfg.ProduceSync + + publisher, err := kafka.NewPublisher(config, nil) + require.NoError(t, err) + + kafkaClient := &mockClient{} + publisher.Client = kafkaClient + + return publisher, kafkaClient +} + +func getHeader(headers []kgo.RecordHeader, key string) []byte { + for _, h := range headers { + if h.Key == key { + return h.Value + } + } + return nil +} + +type mockClient struct { + Records []*kgo.Record +} + +func (m *mockClient) Reset() { + m.Records = nil +} + +func (m *mockClient) Close() {} + +func (m *mockClient) Flush(_ context.Context) error { + return nil +} + +func (m *mockClient) Produce(_ context.Context, record *kgo.Record, _ func(*kgo.Record, error)) { + m.Records = append(m.Records, record) +} + +func (m *mockClient) ProduceSync(_ context.Context, records ...*kgo.Record) kgo.ProduceResults { + m.Records = append(m.Records, records...) + return kgo.ProduceResults{} +} diff --git a/internal/server/server.go b/internal/server/server.go index 0129fff60..20c0ef6b6 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -54,6 +54,8 @@ import ( _ "github.com/cerbos/cerbos/internal/audit/local" // Import to register the file audit log backend. _ "github.com/cerbos/cerbos/internal/audit/file" + // Import to register the kafka audit log backend. + _ "github.com/cerbos/cerbos/internal/audit/kafka" "github.com/cerbos/cerbos/internal/auxdata" "github.com/cerbos/cerbos/internal/compile" "github.com/cerbos/cerbos/internal/engine"