Skip to content

Commit 2e335d5

Browse files
authoredApr 18, 2024
enhancement: Write audit logs asynchronously (#2104)
If requests are very large, serializing them to stdout or files can take a while. This PR makes the audit log write asynchronous so that the response can be returned without waiting for slow output sinks. Signed-off-by: Charith Ellawala <charith@cerbos.dev> Write audit log entries asynchronously --------- Signed-off-by: Charith Ellawala <charith@cerbos.dev>
1 parent b022d25 commit 2e335d5

File tree

4 files changed

+35
-14
lines changed

4 files changed

+35
-14
lines changed
 

‎go.mod

-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ require (
5454
github.com/microsoft/go-mssqldb v1.7.0
5555
github.com/minio/minio-go/v7 v7.0.69
5656
github.com/nlepage/go-tarfs v1.2.1
57-
github.com/ohler55/ojg v1.21.4
5857
github.com/oklog/ulid/v2 v2.1.0
5958
github.com/olekukonko/tablewriter v0.0.5
6059
github.com/ory/dockertest/v3 v3.10.0

‎go.sum

-2
Original file line numberDiff line numberDiff line change
@@ -579,8 +579,6 @@ github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJm
579579
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
580580
github.com/nlepage/go-tarfs v1.2.1 h1:o37+JPA+ajllGKSPfy5+YpsNHDjZnAoyfvf5GsUa+Ks=
581581
github.com/nlepage/go-tarfs v1.2.1/go.mod h1:rno18mpMy9aEH1IiJVftFsqPyIpwqSUiAOpJYjlV2NA=
582-
github.com/ohler55/ojg v1.21.4 h1:2iWyz/xExx0XySVIxR9kWFxIdsLNrpWLrKuAcs5aOZU=
583-
github.com/ohler55/ojg v1.21.4/go.mod h1:gQhDVpQLqrmnd2eqGAvJtn+NfKoYJbe/A4Sj3/Vro4o=
584582
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
585583
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
586584
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=

‎internal/audit/kafka/kafka_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func TestProduceWithTLS(t *testing.T) {
7979
}, nil
8080
})
8181
require.NoError(t, err)
82+
require.NoError(t, log.Close())
8283

8384
// validate we see this entries in kafka
8485
records, err := fetchKafkaTopic(t, uri, defaultIntegrationTopic, true)
@@ -118,6 +119,7 @@ func TestSyncProduce(t *testing.T) {
118119
}, nil
119120
})
120121
require.NoError(t, err)
122+
require.NoError(t, log.Close())
121123

122124
// validate we see this entries in kafka
123125
records, err := fetchKafkaTopic(t, uri, defaultIntegrationTopic, false)
@@ -156,6 +158,7 @@ func TestCompression(t *testing.T) {
156158
}, nil
157159
})
158160
require.NoError(t, err)
161+
require.NoError(t, log.Close())
159162
}
160163

161164
// validate we see these entries in kafka
@@ -196,6 +199,7 @@ func TestAsyncProduce(t *testing.T) {
196199
}, nil
197200
})
198201
require.NoError(t, err)
202+
require.NoError(t, log.Close())
199203

200204
// validate we see this entries in kafka, eventually
201205
require.Eventually(t, func() bool {

‎internal/audit/log.go

+31-11
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@ import (
88
"errors"
99
"fmt"
1010
"io"
11+
"runtime"
1112
"sync"
1213
"time"
1314

15+
"github.com/sourcegraph/conc/pool"
16+
"go.uber.org/zap"
17+
1418
auditv1 "github.com/cerbos/cerbos/api/genpb/cerbos/audit/v1"
1519
"github.com/cerbos/cerbos/internal/config"
20+
"github.com/cerbos/cerbos/internal/observability/logging"
1621
"github.com/cerbos/cerbos/internal/observability/metrics"
1722
)
1823

@@ -103,8 +108,7 @@ func NewLogFromConf(ctx context.Context, confW *config.Wrapper) (Log, error) {
103108
return nil, fmt.Errorf("failed to create backend: %w", err)
104109
}
105110

106-
lw := &logWrapper{conf: conf, backend: backend}
107-
111+
lw := newLogWrapper(conf, backend)
108112
if q, ok := backend.(QueryableLog); ok {
109113
return &queryableLogWrapper{logWrapper: lw, queryable: q}, nil
110114
}
@@ -115,13 +119,24 @@ func NewLogFromConf(ctx context.Context, confW *config.Wrapper) (Log, error) {
115119
// NewNopLog returns an audit log that does nothing.
116120
func NewNopLog() Log {
117121
conf := &Conf{confHolder: confHolder{Enabled: false, AccessLogsEnabled: false, DecisionLogsEnabled: false}}
118-
return &logWrapper{conf: conf}
122+
return newLogWrapper(conf, nil)
123+
}
124+
125+
func newLogWrapper(conf *Conf, backend Log) *logWrapper {
126+
lw := &logWrapper{conf: conf}
127+
if backend != nil {
128+
lw.backend = backend
129+
lw.pool = pool.New().WithMaxGoroutines(runtime.NumCPU())
130+
}
131+
132+
return lw
119133
}
120134

121135
// logWrapper wraps the backends and enforces the config options.
122136
type logWrapper struct {
123137
conf *Conf
124138
backend Log
139+
pool *pool.Pool
125140
}
126141

127142
func (lw *logWrapper) Backend() string {
@@ -137,10 +152,12 @@ func (lw *logWrapper) WriteAccessLogEntry(ctx context.Context, entry AccessLogEn
137152
return nil
138153
}
139154

140-
if err := lw.backend.WriteAccessLogEntry(ctx, entry); err != nil {
141-
metrics.Inc(ctx, metrics.AuditErrorCount(), metrics.KindKey(KindAccess))
142-
return err
143-
}
155+
lw.pool.Go(func() {
156+
if err := lw.backend.WriteAccessLogEntry(ctx, entry); err != nil {
157+
metrics.Inc(ctx, metrics.AuditErrorCount(), metrics.KindKey(KindAccess))
158+
logging.FromContext(ctx).Warn("Failed to write access log entry", zap.Error(err))
159+
}
160+
})
144161

145162
return nil
146163
}
@@ -150,16 +167,19 @@ func (lw *logWrapper) WriteDecisionLogEntry(ctx context.Context, entry DecisionL
150167
return nil
151168
}
152169

153-
if err := lw.backend.WriteDecisionLogEntry(ctx, entry); err != nil {
154-
metrics.Inc(ctx, metrics.AuditErrorCount(), metrics.KindKey(KindDecision))
155-
return err
156-
}
170+
lw.pool.Go(func() {
171+
if err := lw.backend.WriteDecisionLogEntry(ctx, entry); err != nil {
172+
metrics.Inc(ctx, metrics.AuditErrorCount(), metrics.KindKey(KindDecision))
173+
logging.FromContext(ctx).Warn("Failed to write decision log entry", zap.Error(err))
174+
}
175+
})
157176

158177
return nil
159178
}
160179

161180
func (lw *logWrapper) Close() error {
162181
if lw.backend != nil {
182+
lw.pool.Wait()
163183
return lw.backend.Close()
164184
}
165185
return nil

0 commit comments

Comments
 (0)