Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Kafka audit log backend Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Support JSON & Proto encoding Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Async publishing Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Generate missing configuration Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Fix broker config example Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Include audi kind as header Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Test helpers use correct line nums Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Fix field alignment Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Default to async publishing Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Flush buffered messages Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> Ignore error for now Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Enable debug logger on kafka client Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Configure client ID Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Configure max buffered Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Configure required acks Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> Disable idempotency when not ack all Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Drop 2nd item to fix docs issue Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * YAML parsing converts to duration Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Remove header allocations Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Refactor marshalling & various cleanup Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> Regen docs Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Return error when flushing on close Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Not required, formatAck catches invalid Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> * Drop type switching and embed proto.Message Signed-off-by: Rob Crowe <nobby.crowe@gmail.com> --------- Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>
- Loading branch information
Showing
8 changed files
with
609 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
// Copyright 2021-2023 Zenauth Ltd. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package kafka | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
||
"github.com/cerbos/cerbos/internal/audit" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
) | ||
|
||
const confKey = audit.ConfKey + ".kafka" | ||
|
||
const ( | ||
defaultAcknowledgement = AckAll | ||
defaultEncoding = EncodingJSON | ||
defaultFlushTimeout = 30 * time.Second | ||
defaultClientID = "cerbos" | ||
defaultMaxBufferedLogs = 250 | ||
) | ||
|
||
// Conf is optional configuration for kafka Audit. | ||
type Conf struct { | ||
// Required acknowledgement for messages, accepts none, leader or the default all. Idempotency disabled when not all | ||
Ack string `yaml:"ack" conf:",example=all"` | ||
// Name of the topic audit entries are written to | ||
Topic string `yaml:"topic" conf:",example=cerbos.audit.log"` | ||
// Data format written to Kafka, accepts either json (default) or protobuf | ||
Encoding Encoding `yaml:"encoding" conf:",example=protobuf"` | ||
// Identifier sent with all requests to Kafka | ||
ClientID string `yaml:"clientID" conf:",example=cerbos"` | ||
// Seed brokers Kafka client will connect to | ||
Brokers []string `yaml:"brokers" conf:",example=['localhost:9092']"` | ||
// Timeout for flushing messages to Kafka | ||
FlushTimeout time.Duration `yaml:"flushTimeout" conf:",example=30s"` | ||
// MaxBufferedLogs sets the max amount of logs the client will buffer before blocking | ||
MaxBufferedLogs int `yaml:"maxBufferedLogs" conf:",example=1000"` | ||
// Increase reliability by stopping asynchronous publishing at the cost of reduced performance | ||
ProduceSync bool `yaml:"produceSync" conf:",example=true"` | ||
} | ||
|
||
func (c *Conf) Key() string { | ||
return confKey | ||
} | ||
|
||
func (c *Conf) SetDefaults() { | ||
c.Ack = defaultAcknowledgement | ||
c.Encoding = defaultEncoding | ||
c.FlushTimeout = defaultFlushTimeout | ||
c.ClientID = defaultClientID | ||
c.MaxBufferedLogs = defaultMaxBufferedLogs | ||
} | ||
|
||
func (c *Conf) Validate() error { | ||
if _, err := formatAck(c.Ack); err != nil { | ||
return err | ||
} | ||
|
||
if strings.TrimSpace(c.Topic) == "" { | ||
return errors.New("invalid topic") | ||
} | ||
|
||
switch c.Encoding { | ||
case EncodingJSON, EncodingProtobuf: | ||
default: | ||
return fmt.Errorf("invalid encoding format: %s", c.Encoding) | ||
} | ||
|
||
if c.FlushTimeout <= 0 { | ||
return errors.New("invalid flush timeout") | ||
} | ||
|
||
if strings.TrimSpace(c.ClientID) == "" { | ||
return errors.New("invalid client ID") | ||
} | ||
|
||
if len(c.Brokers) == 0 { | ||
return errors.New("empty brokers") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func formatAck(ack string) (kgo.Acks, error) { | ||
switch ack { | ||
case AckNone: | ||
return kgo.NoAck(), nil | ||
case AckAll: | ||
return kgo.AllISRAcks(), nil | ||
case AckLeader: | ||
return kgo.LeaderAck(), nil | ||
default: | ||
return kgo.NoAck(), fmt.Errorf("invalid ack value: %s", ack) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
// Copyright 2021-2023 Zenauth Ltd. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package kafka | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
auditv1 "github.com/cerbos/cerbos/api/genpb/cerbos/audit/v1" | ||
) | ||
|
||
var encoding = []Encoding{EncodingJSON, EncodingProtobuf} | ||
|
||
func BenchmarkRecordMarshaller_AccessLog(b *testing.B) { | ||
for _, enc := range encoding { | ||
b.Run(fmt.Sprintf("encoding_%s", enc), func(b *testing.B) { | ||
m := newMarshaller(enc) | ||
rec := &auditv1.AccessLogEntry{ | ||
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FAV", | ||
} | ||
|
||
b.ResetTimer() | ||
b.ReportAllocs() | ||
|
||
for i := 0; i < b.N; i++ { | ||
if _, err := m.Marshal(rec, KindAccess); err != nil { | ||
b.Fatal(err) | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func BenchmarkRecordMarshaller_DecisionLog(b *testing.B) { | ||
for _, enc := range encoding { | ||
b.Run(fmt.Sprintf("encoding_%s", enc), func(b *testing.B) { | ||
m := newMarshaller(enc) | ||
rec := &auditv1.DecisionLogEntry{ | ||
CallId: "01ARZ3NDEKTSV4RRFFQ69G5FAV", | ||
} | ||
|
||
b.ResetTimer() | ||
b.ReportAllocs() | ||
|
||
for i := 0; i < b.N; i++ { | ||
if _, err := m.Marshal(rec, KindDecision); err != nil { | ||
b.Fatal(err) | ||
} | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.