Skip to content

Commit

Permalink
fix: Kafka async publish fails when the API request returns (#1510)
Browse files Browse the repository at this point in the history
* Fix comment to relate to what it configures

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Remove gRPC elements to the log

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Timeout is specifically when closing

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Publish timeout that detaches ctx for async publish

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Update Kafka audit backend docs

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Missed documentation

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

* Keep consistent with Kafka produce naming

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>

---------

Signed-off-by: Rob Crowe <nobby.crowe@gmail.com>
  • Loading branch information
rcrowe committed Mar 31, 2023
1 parent 7507b68 commit a9e540d
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 16 deletions.
6 changes: 4 additions & 2 deletions docs/modules/configuration/pages/audit.adoc
Expand Up @@ -32,10 +32,11 @@ audit:
ack: all # Ack mode for producing messages. Valid values are "none", "leader" or "all" (default). Idempotency is disabled when mode is not "all".
brokers: ['localhost:9092'] # Required. Brokers list to seed the Kafka client.
clientID: cerbos # ClientID reported in Kafka connections.
closeTimeout: 30s # CloseTimeout sets how long when closing the client to wait for any remaining messages to be flushed.
encoding: json # Encoding format. Valid values are "json" (default) or "protobuf".
flushTimeout: 30s # FlushTimeout sets how often messages are flushed to the remote Kafka server.
maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode.
produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance.
produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up.
topic: cerbos.audit.log # Required. Topic to write audit entries to.
----

Expand Down Expand Up @@ -138,9 +139,10 @@ audit:
ack: all # Ack mode for producing messages. Valid values are "none", "leader" or "all" (default). Idempotency is disabled when mode is not "all".
brokers: ['localhost:9092'] # Required. Brokers list to seed the Kafka client.
clientID: cerbos # ClientID reported in Kafka connections.
closeTimeout: 30s # CloseTimeout sets how long when closing the client to wait for any remaining messages to be flushed.
encoding: json # Encoding format. Valid values are "json" (default) or "protobuf".
flushTimeout: 30s # FlushTimeout sets how often messages are flushed to the remote Kafka server.
maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode.
produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance.
produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up.
topic: cerbos.audit.log # Required. Topic to write audit entries to.
----
3 changes: 2 additions & 1 deletion docs/modules/configuration/partials/fullconfiguration.adoc
Expand Up @@ -17,10 +17,11 @@ audit:
ack: all # Ack mode for producing messages. Valid values are "none", "leader" or "all" (default). Idempotency is disabled when mode is not "all".
brokers: ['localhost:9092'] # Required. Brokers list to seed the Kafka client.
clientID: cerbos # ClientID reported in Kafka connections.
closeTimeout: 30s # CloseTimeout sets how long when closing the client to wait for any remaining messages to be flushed.
encoding: json # Encoding format. Valid values are "json" (default) or "protobuf".
flushTimeout: 30s # FlushTimeout sets how often messages are flushed to the remote Kafka server.
maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode.
produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance.
produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up.
topic: cerbos.audit.log # Required. Topic to write audit entries to.
local:
advanced:
Expand Down
20 changes: 14 additions & 6 deletions internal/audit/kafka/conf.go
Expand Up @@ -18,7 +18,8 @@ const confKey = audit.ConfKey + ".kafka"
const (
defaultAcknowledgement = AckAll
defaultEncoding = EncodingJSON
defaultFlushTimeout = 30 * time.Second
defaultCloseTimeout = 30 * time.Second
defaultProduceTimeout = 5 * time.Second
defaultClientID = "cerbos"
defaultMaxBufferedRecords = 250
)
Expand All @@ -35,12 +36,14 @@ type Conf struct {
ClientID string `yaml:"clientID" conf:",example=cerbos"`
// Brokers list to seed the Kafka client.
Brokers []string `yaml:"brokers" conf:"required,example=['localhost:9092']"`
// FlushTimeout sets how often messages are flushed to the remote Kafka server.
FlushTimeout time.Duration `yaml:"flushTimeout" conf:",example=30s"`
// CloseTimeout sets how long when closing the client to wait for any remaining messages to be flushed.
CloseTimeout time.Duration `yaml:"closeTimeout" conf:",example=30s"`
// MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode.
MaxBufferedRecords int `yaml:"maxBufferedRecords" conf:",example=1000"`
// ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance.
ProduceSync bool `yaml:"produceSync" conf:",example=false"`
// ProduceTimeout sets how long to attempt to publish a message before giving up.
ProduceTimeout time.Duration `yaml:"produceTimeout" conf:",example=5s"`
}

func (c *Conf) Key() string {
Expand All @@ -50,7 +53,8 @@ func (c *Conf) Key() string {
func (c *Conf) SetDefaults() {
c.Ack = defaultAcknowledgement
c.Encoding = defaultEncoding
c.FlushTimeout = defaultFlushTimeout
c.CloseTimeout = defaultCloseTimeout
c.ProduceTimeout = defaultProduceTimeout
c.ClientID = defaultClientID
c.MaxBufferedRecords = defaultMaxBufferedRecords
}
Expand All @@ -70,8 +74,12 @@ func (c *Conf) Validate() error {
return fmt.Errorf("invalid encoding format: %s", c.Encoding)
}

if c.FlushTimeout <= 0 {
return errors.New("invalid flush timeout")
if c.CloseTimeout <= 0 {
return errors.New("invalid close timeout")
}

if c.ProduceTimeout <= 0 {
return errors.New("invalid produce timeout")
}

if strings.TrimSpace(c.ClientID) == "" {
Expand Down
24 changes: 17 additions & 7 deletions internal/audit/kafka/publisher.go
Expand Up @@ -9,7 +9,7 @@ import (
"os"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"github.com/cerbos/cerbos/internal/observability/logging"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kzap"
"go.uber.org/zap"
Expand Down Expand Up @@ -68,7 +68,8 @@ type Publisher struct {
decisionFilter audit.DecisionLogEntryFilter
marshaller recordMarshaller
sync bool
flushTimeout time.Duration
closeTimeout time.Duration
produceTimeout time.Duration
}

func NewPublisher(conf *Conf, decisionFilter audit.DecisionLogEntryFilter) (*Publisher, error) {
Expand Down Expand Up @@ -104,12 +105,13 @@ func NewPublisher(conf *Conf, decisionFilter audit.DecisionLogEntryFilter) (*Pub
decisionFilter: decisionFilter,
marshaller: newMarshaller(conf.Encoding),
sync: conf.ProduceSync,
flushTimeout: conf.FlushTimeout,
closeTimeout: conf.CloseTimeout,
produceTimeout: conf.ProduceTimeout,
}, nil
}

func (p *Publisher) Close() error {
flushCtx, flushCancel := context.WithTimeout(context.Background(), p.flushTimeout)
flushCtx, flushCancel := context.WithTimeout(context.Background(), p.closeTimeout)
defer flushCancel()
if err := p.Client.Flush(flushCtx); err != nil {
return err
Expand Down Expand Up @@ -164,13 +166,21 @@ func (p *Publisher) WriteDecisionLogEntry(ctx context.Context, record audit.Deci

func (p *Publisher) write(ctx context.Context, msg *kgo.Record) error {
if p.sync {
return p.Client.ProduceSync(ctx, msg).FirstErr()
produceCtx, produceCancel := context.WithTimeout(ctx, p.produceTimeout)
defer produceCancel()

return p.Client.ProduceSync(produceCtx, msg).FirstErr()
}

p.Client.Produce(ctx, msg, func(r *kgo.Record, err error) {
// detach the context from the caller so the request can return
// without cancelling any async kafka operations
produceCtx, produceCancel := context.WithTimeout(context.Background(), p.produceTimeout)
defer produceCancel()

p.Client.Produce(produceCtx, msg, func(r *kgo.Record, err error) {
if err != nil {
// TODO: Handle via interceptor
ctxzap.Extract(ctx).Warn("failed to write audit log entry", zap.Error(err))
logging.FromContext(ctx).Warn("failed to write audit log entry", zap.Error(err))
}
})
return nil
Expand Down

0 comments on commit a9e540d

Please sign in to comment.