-
Notifications
You must be signed in to change notification settings - Fork 123
/
store.go
252 lines (210 loc) · 7.07 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// Copyright 2021-2023 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0
package storage
import (
"context"
"fmt"
"io"
"sync"
runtimev1 "github.com/cerbos/cerbos/api/genpb/cerbos/runtime/v1"
schemav1 "github.com/cerbos/cerbos/api/genpb/cerbos/schema/v1"
"github.com/cerbos/cerbos/internal/config"
"github.com/cerbos/cerbos/internal/namer"
"github.com/cerbos/cerbos/internal/policy"
)
var (
driversMu sync.RWMutex
drivers = map[string]Constructor{}
)
// InvalidPolicyError is a custom error to signal that a policy is invalid.
type InvalidPolicyError struct {
Err error
Message string
}
func (ipe InvalidPolicyError) Error() string {
return fmt.Sprintf("%s: %v", ipe.Message, ipe.Err)
}
func (ipe InvalidPolicyError) Unwrap() error {
return ipe.Err
}
func NewInvalidPolicyError(err error, msg string, args ...any) InvalidPolicyError {
return InvalidPolicyError{Message: fmt.Sprintf(msg, args...), Err: err}
}
// InvalidSchemaError is a custom error to signal that a schema is invalid.
type InvalidSchemaError struct {
Err error
Message string
}
func (ise InvalidSchemaError) Error() string {
return fmt.Sprintf("%s: %v", ise.Message, ise.Err)
}
func (ise InvalidSchemaError) Unwrap() error {
return ise.Err
}
func NewInvalidSchemaError(err error, msg string, args ...any) InvalidSchemaError {
return InvalidSchemaError{Message: fmt.Sprintf(msg, args...), Err: err}
}
// Constructor is a constructor function for a storage driver.
type Constructor func(context.Context, *config.Wrapper) (Store, error)
// RegisterDriver registers a storage driver.
func RegisterDriver(name string, cons Constructor) {
driversMu.Lock()
defer driversMu.Unlock()
drivers[name] = cons
}
// GetDriverConstructor registers a storage driver.
func GetDriverConstructor(name string) (Constructor, error) {
driversMu.RLock()
defer driversMu.RUnlock()
cons, ok := drivers[name]
if !ok {
return nil, fmt.Errorf("unknown storage driver [%s]", name)
}
return cons, nil
}
// New returns a storage driver implementation based on the configured driver.
func New(ctx context.Context) (Store, error) {
return NewFromConf(ctx, config.Global())
}
// NewFromConf returns a storage driver implementation based on the provided configuration.
func NewFromConf(ctx context.Context, confWrapper *config.Wrapper) (Store, error) {
conf := new(Conf)
if err := confWrapper.GetSection(conf); err != nil {
return nil, fmt.Errorf("failed to get storage driver configuration: %w", err)
}
driversMu.RLock()
cons, ok := drivers[conf.Driver]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("unknown storage driver [%s]", conf.Driver)
}
return cons(ctx, confWrapper)
}
type ListPolicyIDsParams struct {
NameRegexp string
ScopeRegexp string
VersionRegexp string
IncludeDisabled bool
}
// Store is the common interface implemented by storage backends.
type Store interface {
// Driver is the name of the storage backend implementation.
Driver() string
// ListPolicyIDs returns the policy IDs in the store
ListPolicyIDs(context.Context, ListPolicyIDsParams) ([]string, error)
// ListSchemaIDs returns the schema ids in the store
ListSchemaIDs(context.Context) ([]string, error)
// LoadSchema loads the given schema from the store.
LoadSchema(context.Context, string) (io.ReadCloser, error)
}
// SourceStore is implemented by stores that have policies in their source format (uncompiled).
type SourceStore interface {
Store
Subscribable
// GetFirstMatch searches for the given module IDs in order and returns the first one found.
GetFirstMatch(context.Context, []namer.ModuleID) (*policy.CompilationUnit, error)
// GetCompilationUnits gets the compilation units for the given module IDs.
GetCompilationUnits(context.Context, ...namer.ModuleID) (map[namer.ModuleID]*policy.CompilationUnit, error)
// GetDependents returns the dependents of the given modules.
GetDependents(context.Context, ...namer.ModuleID) (map[namer.ModuleID][]namer.ModuleID, error)
// LoadPolicy loads the given policy from the store
LoadPolicy(context.Context, ...string) ([]*policy.Wrapper, error)
}
// BinaryStore is implemented by stores that have pre-compiled policies in binary format.
type BinaryStore interface {
Store
// GetFirstMatch searches for the given module IDs in order and returns the first one found.
GetFirstMatch(context.Context, []namer.ModuleID) (*runtimev1.RunnablePolicySet, error)
}
// MutableStore is a store that allows mutations.
type MutableStore interface {
Store
AddOrUpdate(context.Context, ...policy.Wrapper) error
AddOrUpdateSchema(context.Context, ...*schemav1.Schema) error
Disable(context.Context, ...string) (uint32, error)
Enable(context.Context, ...string) (uint32, error)
DeleteSchema(context.Context, ...string) (uint32, error)
Delete(context.Context, ...namer.ModuleID) error
}
// Verifiable stores allow querying whether the requirements for the store are met.
type Verifiable interface {
CheckSchema(ctx context.Context) error
}
// Reloadable stores allow reloading their contents.
type Reloadable interface {
Reload(context.Context) error
}
// Instrumented stores expose repository stats.
type Instrumented interface {
RepoStats(context.Context) RepoStats
}
// Subscribable is an interface for managing subscriptions to storage events.
type Subscribable interface {
// Subscribe adds a subscriber to listen for storage notifications.
Subscribe(Subscriber)
// Unsubscribe removes a subscriber.
Unsubscribe(Subscriber)
}
// Subscriber is the interface implemented by storage subscribers.
type Subscriber interface {
SubscriberID() string
OnStorageEvent(...Event)
}
// EventKind identifies the kind of storage event such as addition or deletion.
type EventKind int
const (
EventAddOrUpdatePolicy EventKind = iota
EventDeleteOrDisablePolicy
EventAddOrUpdateSchema
EventDeleteSchema
EventReload
EventNop
)
// Event is an event detected by the storage layer.
type Event struct {
OldPolicyID *namer.ModuleID
SchemaFile string
Kind EventKind
PolicyID namer.ModuleID
}
func (evt Event) String() string {
var kind string
id := evt.PolicyID.String()
switch evt.Kind {
case EventAddOrUpdatePolicy:
kind = "ADD/UPDATE"
case EventDeleteOrDisablePolicy:
kind = "DELETE/DISABLE"
case EventAddOrUpdateSchema:
kind = "ADD/UPDATE SCHEMA"
id = evt.SchemaFile
case EventDeleteSchema:
kind = "DELETE SCHEMA"
id = evt.SchemaFile
case EventReload:
kind = "RELOAD"
case EventNop:
kind = "NOP"
default:
kind = "UNKNOWN"
}
return fmt.Sprintf("%s [%s]", kind, id)
}
// NewPolicyEvent creates a new storage event for a policy.
func NewPolicyEvent(kind EventKind, policyID namer.ModuleID) Event {
return Event{Kind: kind, PolicyID: policyID}
}
// NewSchemaEvent creates a new storage event for a schema.
func NewSchemaEvent(kind EventKind, schemaFile string) Event {
return Event{Kind: kind, SchemaFile: schemaFile}
}
// NewReloadEvent creates a new reload event.
func NewReloadEvent() Event {
return Event{Kind: EventReload}
}
type RepoStats struct {
PolicyCount map[policy.Kind]int
AvgRuleCount map[policy.Kind]float64
AvgConditionCount map[policy.Kind]float64
SchemaCount int
}