@@ -8,11 +8,16 @@ import (
8
8
"errors"
9
9
"fmt"
10
10
"io"
11
+ "runtime"
11
12
"sync"
12
13
"time"
13
14
15
+ "github.com/sourcegraph/conc/pool"
16
+ "go.uber.org/zap"
17
+
14
18
auditv1 "github.com/cerbos/cerbos/api/genpb/cerbos/audit/v1"
15
19
"github.com/cerbos/cerbos/internal/config"
20
+ "github.com/cerbos/cerbos/internal/observability/logging"
16
21
"github.com/cerbos/cerbos/internal/observability/metrics"
17
22
)
18
23
@@ -103,8 +108,7 @@ func NewLogFromConf(ctx context.Context, confW *config.Wrapper) (Log, error) {
103
108
return nil , fmt .Errorf ("failed to create backend: %w" , err )
104
109
}
105
110
106
- lw := & logWrapper {conf : conf , backend : backend }
107
-
111
+ lw := newLogWrapper (conf , backend )
108
112
if q , ok := backend .(QueryableLog ); ok {
109
113
return & queryableLogWrapper {logWrapper : lw , queryable : q }, nil
110
114
}
@@ -115,13 +119,24 @@ func NewLogFromConf(ctx context.Context, confW *config.Wrapper) (Log, error) {
115
119
// NewNopLog returns an audit log that does nothing.
116
120
func NewNopLog () Log {
117
121
conf := & Conf {confHolder : confHolder {Enabled : false , AccessLogsEnabled : false , DecisionLogsEnabled : false }}
118
- return & logWrapper {conf : conf }
122
+ return newLogWrapper (conf , nil )
123
+ }
124
+
125
+ func newLogWrapper (conf * Conf , backend Log ) * logWrapper {
126
+ lw := & logWrapper {conf : conf }
127
+ if backend != nil {
128
+ lw .backend = backend
129
+ lw .pool = pool .New ().WithMaxGoroutines (runtime .NumCPU ())
130
+ }
131
+
132
+ return lw
119
133
}
120
134
121
135
// logWrapper wraps the backends and enforces the config options.
122
136
type logWrapper struct {
123
137
conf * Conf
124
138
backend Log
139
+ pool * pool.Pool
125
140
}
126
141
127
142
func (lw * logWrapper ) Backend () string {
@@ -137,10 +152,12 @@ func (lw *logWrapper) WriteAccessLogEntry(ctx context.Context, entry AccessLogEn
137
152
return nil
138
153
}
139
154
140
- if err := lw .backend .WriteAccessLogEntry (ctx , entry ); err != nil {
141
- metrics .Inc (ctx , metrics .AuditErrorCount (), metrics .KindKey (KindAccess ))
142
- return err
143
- }
155
+ lw .pool .Go (func () {
156
+ if err := lw .backend .WriteAccessLogEntry (ctx , entry ); err != nil {
157
+ metrics .Inc (ctx , metrics .AuditErrorCount (), metrics .KindKey (KindAccess ))
158
+ logging .FromContext (ctx ).Warn ("Failed to write access log entry" , zap .Error (err ))
159
+ }
160
+ })
144
161
145
162
return nil
146
163
}
@@ -150,16 +167,19 @@ func (lw *logWrapper) WriteDecisionLogEntry(ctx context.Context, entry DecisionL
150
167
return nil
151
168
}
152
169
153
- if err := lw .backend .WriteDecisionLogEntry (ctx , entry ); err != nil {
154
- metrics .Inc (ctx , metrics .AuditErrorCount (), metrics .KindKey (KindDecision ))
155
- return err
156
- }
170
+ lw .pool .Go (func () {
171
+ if err := lw .backend .WriteDecisionLogEntry (ctx , entry ); err != nil {
172
+ metrics .Inc (ctx , metrics .AuditErrorCount (), metrics .KindKey (KindDecision ))
173
+ logging .FromContext (ctx ).Warn ("Failed to write decision log entry" , zap .Error (err ))
174
+ }
175
+ })
157
176
158
177
return nil
159
178
}
160
179
161
180
func (lw * logWrapper ) Close () error {
162
181
if lw .backend != nil {
182
+ lw .pool .Wait ()
163
183
return lw .backend .Close ()
164
184
}
165
185
return nil
0 commit comments