From a9e540dfb64ac0f4e30d29204117a53a6c52fdff Mon Sep 17 00:00:00 2001 From: Rob Crowe <282571+rcrowe@users.noreply.github.com> Date: Fri, 31 Mar 2023 11:16:37 +0100 Subject: [PATCH] fix: Kafka async publish fails when the API request returns (#1510) * Fix comment to relate to what it configures Signed-off-by: Rob Crowe * Remove gRPC elements to the log Signed-off-by: Rob Crowe * Timeout is specifically when closing Signed-off-by: Rob Crowe * Publish timeout that detaches ctx for async publish Signed-off-by: Rob Crowe * Update Kafka audit backend docs Signed-off-by: Rob Crowe * Missed documentation Signed-off-by: Rob Crowe * Keep consistent with Kafka produce naming Signed-off-by: Rob Crowe --------- Signed-off-by: Rob Crowe --- docs/modules/configuration/pages/audit.adoc | 6 +++-- .../partials/fullconfiguration.adoc | 3 ++- internal/audit/kafka/conf.go | 20 +++++++++++----- internal/audit/kafka/publisher.go | 24 +++++++++++++------ 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/docs/modules/configuration/pages/audit.adoc b/docs/modules/configuration/pages/audit.adoc index b8a26322b..64715df2a 100644 --- a/docs/modules/configuration/pages/audit.adoc +++ b/docs/modules/configuration/pages/audit.adoc @@ -32,10 +32,11 @@ audit: ack: all # Ack mode for producing messages. Valid values are "none", "leader" or "all" (default). Idempotency is disabled when mode is not "all". brokers: ['localhost:9092'] # Required. Brokers list to seed the Kafka client. clientID: cerbos # ClientID reported in Kafka connections. + closeTimeout: 30s # CloseTimeout sets how long when closing the client to wait for any remaining messages to be flushed. encoding: json # Encoding format. Valid values are "json" (default) or "protobuf". - flushTimeout: 30s # FlushTimeout sets how often messages are flushed to the remote Kafka server. maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode. produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance. + produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up. topic: cerbos.audit.log # Required. Topic to write audit entries to. ---- @@ -138,9 +139,10 @@ audit: ack: all # Ack mode for producing messages. Valid values are "none", "leader" or "all" (default). Idempotency is disabled when mode is not "all". brokers: ['localhost:9092'] # Required. Brokers list to seed the Kafka client. clientID: cerbos # ClientID reported in Kafka connections. + closeTimeout: 30s # CloseTimeout sets how long when closing the client to wait for any remaining messages to be flushed. encoding: json # Encoding format. Valid values are "json" (default) or "protobuf". - flushTimeout: 30s # FlushTimeout sets how often messages are flushed to the remote Kafka server. maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode. produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance. + produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up. topic: cerbos.audit.log # Required. Topic to write audit entries to. ---- diff --git a/docs/modules/configuration/partials/fullconfiguration.adoc b/docs/modules/configuration/partials/fullconfiguration.adoc index 373aa334e..2cd320dc9 100644 --- a/docs/modules/configuration/partials/fullconfiguration.adoc +++ b/docs/modules/configuration/partials/fullconfiguration.adoc @@ -17,10 +17,11 @@ audit: ack: all # Ack mode for producing messages. Valid values are "none", "leader" or "all" (default). Idempotency is disabled when mode is not "all". brokers: ['localhost:9092'] # Required. Brokers list to seed the Kafka client. clientID: cerbos # ClientID reported in Kafka connections. + closeTimeout: 30s # CloseTimeout sets how long when closing the client to wait for any remaining messages to be flushed. encoding: json # Encoding format. Valid values are "json" (default) or "protobuf". - flushTimeout: 30s # FlushTimeout sets how often messages are flushed to the remote Kafka server. maxBufferedRecords: 1000 # MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode. produceSync: false # ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance. + produceTimeout: 5s # ProduceTimeout sets how long to attempt to publish a message before giving up. topic: cerbos.audit.log # Required. Topic to write audit entries to. local: advanced: diff --git a/internal/audit/kafka/conf.go b/internal/audit/kafka/conf.go index dcff38d63..66148275f 100644 --- a/internal/audit/kafka/conf.go +++ b/internal/audit/kafka/conf.go @@ -18,7 +18,8 @@ const confKey = audit.ConfKey + ".kafka" const ( defaultAcknowledgement = AckAll defaultEncoding = EncodingJSON - defaultFlushTimeout = 30 * time.Second + defaultCloseTimeout = 30 * time.Second + defaultProduceTimeout = 5 * time.Second defaultClientID = "cerbos" defaultMaxBufferedRecords = 250 ) @@ -35,12 +36,14 @@ type Conf struct { ClientID string `yaml:"clientID" conf:",example=cerbos"` // Brokers list to seed the Kafka client. Brokers []string `yaml:"brokers" conf:"required,example=['localhost:9092']"` - // FlushTimeout sets how often messages are flushed to the remote Kafka server. - FlushTimeout time.Duration `yaml:"flushTimeout" conf:",example=30s"` + // CloseTimeout sets how long when closing the client to wait for any remaining messages to be flushed. + CloseTimeout time.Duration `yaml:"closeTimeout" conf:",example=30s"` // MaxBufferedRecords sets the maximum number of records the client should buffer in memory in async mode. MaxBufferedRecords int `yaml:"maxBufferedRecords" conf:",example=1000"` // ProduceSync forces the client to produce messages to Kafka synchronously. This can have a significant impact on performance. ProduceSync bool `yaml:"produceSync" conf:",example=false"` + // ProduceTimeout sets how long to attempt to publish a message before giving up. + ProduceTimeout time.Duration `yaml:"produceTimeout" conf:",example=5s"` } func (c *Conf) Key() string { @@ -50,7 +53,8 @@ func (c *Conf) Key() string { func (c *Conf) SetDefaults() { c.Ack = defaultAcknowledgement c.Encoding = defaultEncoding - c.FlushTimeout = defaultFlushTimeout + c.CloseTimeout = defaultCloseTimeout + c.ProduceTimeout = defaultProduceTimeout c.ClientID = defaultClientID c.MaxBufferedRecords = defaultMaxBufferedRecords } @@ -70,8 +74,12 @@ func (c *Conf) Validate() error { return fmt.Errorf("invalid encoding format: %s", c.Encoding) } - if c.FlushTimeout <= 0 { - return errors.New("invalid flush timeout") + if c.CloseTimeout <= 0 { + return errors.New("invalid close timeout") + } + + if c.ProduceTimeout <= 0 { + return errors.New("invalid produce timeout") } if strings.TrimSpace(c.ClientID) == "" { diff --git a/internal/audit/kafka/publisher.go b/internal/audit/kafka/publisher.go index 47c82e539..e49d7ee2d 100644 --- a/internal/audit/kafka/publisher.go +++ b/internal/audit/kafka/publisher.go @@ -9,7 +9,7 @@ import ( "os" "time" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/cerbos/cerbos/internal/observability/logging" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/plugin/kzap" "go.uber.org/zap" @@ -68,7 +68,8 @@ type Publisher struct { decisionFilter audit.DecisionLogEntryFilter marshaller recordMarshaller sync bool - flushTimeout time.Duration + closeTimeout time.Duration + produceTimeout time.Duration } func NewPublisher(conf *Conf, decisionFilter audit.DecisionLogEntryFilter) (*Publisher, error) { @@ -104,12 +105,13 @@ func NewPublisher(conf *Conf, decisionFilter audit.DecisionLogEntryFilter) (*Pub decisionFilter: decisionFilter, marshaller: newMarshaller(conf.Encoding), sync: conf.ProduceSync, - flushTimeout: conf.FlushTimeout, + closeTimeout: conf.CloseTimeout, + produceTimeout: conf.ProduceTimeout, }, nil } func (p *Publisher) Close() error { - flushCtx, flushCancel := context.WithTimeout(context.Background(), p.flushTimeout) + flushCtx, flushCancel := context.WithTimeout(context.Background(), p.closeTimeout) defer flushCancel() if err := p.Client.Flush(flushCtx); err != nil { return err @@ -164,13 +166,21 @@ func (p *Publisher) WriteDecisionLogEntry(ctx context.Context, record audit.Deci func (p *Publisher) write(ctx context.Context, msg *kgo.Record) error { if p.sync { - return p.Client.ProduceSync(ctx, msg).FirstErr() + produceCtx, produceCancel := context.WithTimeout(ctx, p.produceTimeout) + defer produceCancel() + + return p.Client.ProduceSync(produceCtx, msg).FirstErr() } - p.Client.Produce(ctx, msg, func(r *kgo.Record, err error) { + // detach the context from the caller so the request can return + // without cancelling any async kafka operations + produceCtx, produceCancel := context.WithTimeout(context.Background(), p.produceTimeout) + defer produceCancel() + + p.Client.Produce(produceCtx, msg, func(r *kgo.Record, err error) { if err != nil { // TODO: Handle via interceptor - ctxzap.Extract(ctx).Warn("failed to write audit log entry", zap.Error(err)) + logging.FromContext(ctx).Warn("failed to write audit log entry", zap.Error(err)) } }) return nil