Skip to content

Commit

Permalink
feat: Kafka audit log (#1499)
Browse files Browse the repository at this point in the history
* Kafka audit log backend

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Support JSON & Proto encoding

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Async publishing

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Generate missing configuration

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Fix broker config example

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Include audi kind as header

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Test helpers use correct line nums

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Fix field alignment

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Default to async publishing

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Flush buffered messages

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

Ignore error for now

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Enable debug logger on kafka client

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Configure client ID

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Configure max buffered

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Configure required acks

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

Disable idempotency when not ack all

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Drop 2nd item to fix docs issue

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* YAML parsing converts to duration

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Remove header allocations

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Refactor marshalling & various cleanup

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

Regen docs

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Return error when flushing on close

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Not required, formatAck catches invalid

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Drop type switching and embed proto.Message

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

---------

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>
  • Loading branch information
rcrowe committed Mar 27, 2023
1 parent eb0d3da commit 4a440e9
Show file tree
Hide file tree
Showing 8 changed files with 609 additions and 3 deletions.
9 changes: 9 additions & 0 deletions docs/modules/configuration/partials/fullconfiguration.adoc
Expand Up @@ -13,6 +13,15 @@ audit:
includeMetadataKeys: ['content-type'] # IncludeMetadataKeys defines which gRPC request metadata keys should be included in the audit logs.
file:
path: /path/to/file.log # Path to the log file to use as output. The special values stdout and stderr can be used to write to stdout or stderr respectively.
kafka:
ack: all # Required acknowledgement for messages, accepts none, leader or the default all. Idempotency disabled when not all
brokers: ['localhost:9092'] # Seed brokers Kafka client will connect to
clientID: cerbos # Identifier sent with all requests to Kafka
encoding: protobuf # Data format written to Kafka, accepts either json (default) or protobuf
flushTimeout: 30s # Timeout for flushing messages to Kafka
maxBufferedLogs: 1000 # MaxBufferedLogs sets the max amount of logs the client will buffer before blocking
produceSync: true # Increase reliability by stopping asynchronous publishing at the cost of reduced performance
topic: cerbos.audit.log # Name of the topic audit entries are written to
local:
advanced:
bufferSize: 256
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Expand Up @@ -66,6 +66,8 @@ require (
github.com/tidwall/gjson v1.14.4
github.com/tidwall/pretty v1.2.1
github.com/tidwall/sjson v1.2.5
github.com/twmb/franz-go v1.13.0
github.com/twmb/franz-go/plugin/kzap v1.1.2
go.elastic.co/ecszap v1.0.1
go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0
Expand Down Expand Up @@ -182,7 +184,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/klauspost/compress v1.16.3 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/lestrrat-go/blackmagic v1.0.1 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
Expand All @@ -208,6 +210,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/opencontainers/runc v1.1.2 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -227,6 +230,7 @@ require (
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
Expand Down
12 changes: 10 additions & 2 deletions go.sum
Expand Up @@ -1626,8 +1626,8 @@ github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY=
github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
Expand Down Expand Up @@ -1943,6 +1943,8 @@ github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFzPPsI=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
Expand Down Expand Up @@ -2205,6 +2207,12 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM=
github.com/twmb/franz-go v1.13.0 h1:J4VyTXVlOhiCDCXS56ut2ZRAylaimPXnIqtCq9Wlfbw=
github.com/twmb/franz-go v1.13.0/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk=
github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s=
github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/plugin/kzap v1.1.2 h1:0arX5xJ0soUPX1LlDay6ZZoxuWkWk1lggQ5M/IgRXAE=
github.com/twmb/franz-go/plugin/kzap v1.1.2/go.mod h1:53Cl9Uz1pbdOPDvUISIxLrZIWSa2jCuY1bTMauRMBmo=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
Expand Down
99 changes: 99 additions & 0 deletions internal/audit/kafka/conf.go
@@ -0,0 +1,99 @@
// Copyright 2021-2023 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0

package kafka

import (
"errors"
"fmt"
"strings"
"time"

"github.com/cerbos/cerbos/internal/audit"
"github.com/twmb/franz-go/pkg/kgo"
)

const confKey = audit.ConfKey + ".kafka"

const (
defaultAcknowledgement = AckAll
defaultEncoding = EncodingJSON
defaultFlushTimeout = 30 * time.Second
defaultClientID = "cerbos"
defaultMaxBufferedLogs = 250
)

// Conf is optional configuration for kafka Audit.
type Conf struct {
// Required acknowledgement for messages, accepts none, leader or the default all. Idempotency disabled when not all
Ack string `yaml:"ack" conf:",example=all"`
// Name of the topic audit entries are written to
Topic string `yaml:"topic" conf:",example=cerbos.audit.log"`
// Data format written to Kafka, accepts either json (default) or protobuf
Encoding Encoding `yaml:"encoding" conf:",example=protobuf"`
// Identifier sent with all requests to Kafka
ClientID string `yaml:"clientID" conf:",example=cerbos"`
// Seed brokers Kafka client will connect to
Brokers []string `yaml:"brokers" conf:",example=['localhost:9092']"`
// Timeout for flushing messages to Kafka
FlushTimeout time.Duration `yaml:"flushTimeout" conf:",example=30s"`
// MaxBufferedLogs sets the max amount of logs the client will buffer before blocking
MaxBufferedLogs int `yaml:"maxBufferedLogs" conf:",example=1000"`
// Increase reliability by stopping asynchronous publishing at the cost of reduced performance
ProduceSync bool `yaml:"produceSync" conf:",example=true"`
}

func (c *Conf) Key() string {
return confKey
}

func (c *Conf) SetDefaults() {
c.Ack = defaultAcknowledgement
c.Encoding = defaultEncoding
c.FlushTimeout = defaultFlushTimeout
c.ClientID = defaultClientID
c.MaxBufferedLogs = defaultMaxBufferedLogs
}

func (c *Conf) Validate() error {
if _, err := formatAck(c.Ack); err != nil {
return err
}

if strings.TrimSpace(c.Topic) == "" {
return errors.New("invalid topic")
}

switch c.Encoding {
case EncodingJSON, EncodingProtobuf:
default:
return fmt.Errorf("invalid encoding format: %s", c.Encoding)
}

if c.FlushTimeout <= 0 {
return errors.New("invalid flush timeout")
}

if strings.TrimSpace(c.ClientID) == "" {
return errors.New("invalid client ID")
}

if len(c.Brokers) == 0 {
return errors.New("empty brokers")
}

return nil
}

func formatAck(ack string) (kgo.Acks, error) {
switch ack {
case AckNone:
return kgo.NoAck(), nil
case AckAll:
return kgo.AllISRAcks(), nil
case AckLeader:
return kgo.LeaderAck(), nil
default:
return kgo.NoAck(), fmt.Errorf("invalid ack value: %s", ack)
}
}
53 changes: 53 additions & 0 deletions internal/audit/kafka/marshal_test.go
@@ -0,0 +1,53 @@
// Copyright 2021-2023 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0

package kafka

import (
"fmt"
"testing"

auditv1 "github.com/cerbos/cerbos/api/genpb/cerbos/audit/v1"
)

var encoding = []Encoding{EncodingJSON, EncodingProtobuf}

func BenchmarkRecordMarshaller_AccessLog(b *testing.B) {
for _, enc := range encoding {
b.Run(fmt.Sprintf("encoding_%s", enc), func(b *testing.B) {
m := newMarshaller(enc)
rec := &auditv1.AccessLogEntry{
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FAV",
}

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
if _, err := m.Marshal(rec, KindAccess); err != nil {
b.Fatal(err)
}
}
})
}
}

func BenchmarkRecordMarshaller_DecisionLog(b *testing.B) {
for _, enc := range encoding {
b.Run(fmt.Sprintf("encoding_%s", enc), func(b *testing.B) {
m := newMarshaller(enc)
rec := &auditv1.DecisionLogEntry{
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FAV",
}

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
if _, err := m.Marshal(rec, KindDecision); err != nil {
b.Fatal(err)
}
}
})
}
}

0 comments on commit 4a440e9

Please sign in to comment.