-
Notifications
You must be signed in to change notification settings - Fork 123
/
conf.go
99 lines (83 loc) · 2.75 KB
/
conf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Copyright 2021-2023 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0
package kafka
import (
"errors"
"fmt"
"strings"
"time"
"github.com/cerbos/cerbos/internal/audit"
"github.com/twmb/franz-go/pkg/kgo"
)
const confKey = audit.ConfKey + ".kafka"
const (
defaultAcknowledgement = AckAll
defaultEncoding = EncodingJSON
defaultCloseTimeout = 30 * time.Second
defaultClientID = "cerbos"
defaultMaxBufferedRecords = 250
)
// Conf is optional configuration for kafka Audit.
type Conf struct {
// Ack mode for producing messages. Valid values are "none", "leader" or "all" (default). Idempotency is disabled when mode is not "all".
Ack string `yaml:"ack" conf:",example=all"`
// Topic to write audit entries to.
Topic string `yaml:"topic" conf:"required,example=cerbos.audit.log"`
// Encoding format. Valid values are "json" (default) or "protobuf".
Encoding Encoding `yaml:"encoding" conf:",example=json"`
// ClientID reported in Kafka connections.
ClientID string `yaml:"clientID" conf:",example=cerbos"`
// Brokers list to seed the Kafka client.
Brokers []string `yaml:"brokers" conf:"required,example=['localhost:9092']"`
// CloseTimeout sets how long when closing the client to wait for any remaining messages to be flushed.
CloseTimeout time.Duration `yaml:"closeTimeout" conf:",example=30s"`
// MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode.
MaxBufferedRecords int `yaml:"maxBufferedRecords" conf:",example=1000"`
// ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance.
ProduceSync bool `yaml:"produceSync" conf:",example=false"`
}
func (c *Conf) Key() string {
return confKey
}
func (c *Conf) SetDefaults() {
c.Ack = defaultAcknowledgement
c.Encoding = defaultEncoding
c.CloseTimeout = defaultCloseTimeout
c.ClientID = defaultClientID
c.MaxBufferedRecords = defaultMaxBufferedRecords
}
func (c *Conf) Validate() error {
if _, err := formatAck(c.Ack); err != nil {
return err
}
if strings.TrimSpace(c.Topic) == "" {
return errors.New("invalid topic")
}
switch c.Encoding {
case EncodingJSON, EncodingProtobuf:
default:
return fmt.Errorf("invalid encoding format: %s", c.Encoding)
}
if c.CloseTimeout <= 0 {
return errors.New("invalid close timeout")
}
if strings.TrimSpace(c.ClientID) == "" {
return errors.New("invalid client ID")
}
if len(c.Brokers) == 0 {
return errors.New("empty brokers")
}
return nil
}
func formatAck(ack string) (kgo.Acks, error) {
switch ack {
case AckNone:
return kgo.NoAck(), nil
case AckAll:
return kgo.AllISRAcks(), nil
case AckLeader:
return kgo.LeaderAck(), nil
default:
return kgo.NoAck(), fmt.Errorf("invalid ack value: %s", ack)
}
}