Skip to content

Commit

Permalink
feat: Storage overlay (#1560)
Browse files Browse the repository at this point in the history
Introduces the ability to configure a second fallback storage driver
using a configurable circuit breaker pattern.

Signed-off-by: Sam Lock <sam@swlock.co.uk>
Co-authored-by: Charith Ellawala <charithe@users.noreply.github.com>
  • Loading branch information
Sambigeara and charithe committed May 9, 2023
1 parent 5aca50e commit d0b3f79
Show file tree
Hide file tree
Showing 12 changed files with 663 additions and 10 deletions.
25 changes: 25 additions & 0 deletions docs/modules/configuration/pages/storage.adoc
Expand Up @@ -383,3 +383,28 @@ You can customise the script below to suit your environment. Make sure to specif
include::example$sqlserver_schema.sql[]
----


[#redundancy]
== Redundancy

You can provide redundancy by configuring an `overlay` driver, which wraps a `base` and a `fallback` driver. Under normal operation, the base driver will be targeted as usual. However, if the driver consistently errors, the PDP will start targeting the fallback driver instead. The fallback is determined by a configurable https://learn.microsoft.com/en-us/previous-versions/msp-n-p/dn589784(v=pandp.10)[circuit breaker pattern].

You can configure the fallback error threshold and the fallback error window to determine how many errors can occur within a rolling window before the circuit breaker is tripped.

[source,yaml,linenums]
----
storage:
driver: "overlay"
overlay:
baseDriver: postgres
fallbackDriver: disk
fallbackErrorThreshold: 5 # number of errors that occur within the fallbackErrorWindow to trigger failover
fallbackErrorWindow: 5s # the rolling window in which errors are aggregated
disk:
directory: policies
watchForChanges: true
postgres:
url: "postgres://${PG_USER}:${PG_PASSWORD}@localhost:5432/postgres?sslmode=disable&search_path=cerbos"
----

NOTE: The overlay driver assumes the same interface as the base driver. Any operations that are available on the base driver but not the fallback driver will error if the circuit breaker is open and the fallback driver is being targeted. Likewise, even if the fallback driver supports additional operations compared to the base driver, these will still not be available should failover occur.
6 changes: 6 additions & 0 deletions docs/modules/configuration/partials/fullconfiguration.adoc
Expand Up @@ -157,6 +157,12 @@ storage:
cert: /path/to/certificate
key: /path/to/private_key
caCert: /path/to/CA_certificate
overlay:
# This section is required only if storage.driver is overlay.
baseDriver: blob # Required. BaseDriver is the default storage driver
fallbackDriver: disk # Required. FallbackDriver is the secondary or fallback storage driver
fallbackErrorThreshold: 5 # FallbackErrorThreshold is the max number of errors we allow within the fallbackErrorWindow period
fallbackErrorWindow: 5m # FallbackErrorWindow is the cyclic period within which we aggregate failures
postgres:
# This section is required only if storage.driver is postgres.
connPool:
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -63,6 +63,8 @@ require (
github.com/rs/cors v1.9.0
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/santhosh-tekuri/jsonschema/v5 v5.3.0
github.com/sony/gobreaker v0.5.0
github.com/sourcegraph/conc v0.3.0
github.com/spf13/afero v1.9.5
github.com/stretchr/testify v1.8.2
github.com/tidwall/gjson v1.14.4
Expand Down Expand Up @@ -234,7 +236,6 @@ require (
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/skeema/knownhosts v1.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -2155,6 +2155,8 @@ github.com/snowflakedb/gosnowflake v1.6.3/go.mod h1:6hLajn6yxuJ4xUHZegMekpq9rnQb
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
Expand Down
20 changes: 15 additions & 5 deletions internal/server/server.go
Expand Up @@ -67,6 +67,8 @@ import (

// Import blob to register the storage driver.
_ "github.com/cerbos/cerbos/internal/storage/blob"
"github.com/cerbos/cerbos/internal/storage/overlay"

// Import bundle to register the storage driver.
_ "github.com/cerbos/cerbos/internal/storage/bundle"
// Import mysql to register the storage driver.
Expand Down Expand Up @@ -140,16 +142,24 @@ func Start(ctx context.Context, zpagesEnabled bool) error {
}

var policyLoader engine.PolicyLoader
if bs, ok := store.(storage.BinaryStore); ok {
policyLoader = bs
} else if ss, ok := store.(storage.SourceStore); ok {
switch st := store.(type) {
case storage.BinaryStore:
policyLoader = st
case storage.SourceStore:
// create compile manager
compileMgr, err := compile.NewManager(ctx, ss, schemaMgr)
compileMgr, err := compile.NewManager(ctx, st, schemaMgr)
if err != nil {
return fmt.Errorf("failed to create compile manager: %w", err)
}
policyLoader = compileMgr
} else {
case overlay.Overlay:
// create wrapped policy loader
pl, err := st.GetOverlayPolicyLoader(ctx, schemaMgr)
if err != nil {
return fmt.Errorf("failed to create overlay policy loader: %w", err)
}
policyLoader = pl
default:
return ErrInvalidStore
}

Expand Down
4 changes: 2 additions & 2 deletions internal/storage/blob/store_test.go
Expand Up @@ -61,7 +61,7 @@ func TestNewStore(t *testing.T) {
must := require.New(t)

bucketName := "test"
endpoint := startMinio(ctx, t, bucketName)
endpoint := StartMinio(ctx, t, bucketName)
t.Setenv("AWS_ACCESS_KEY_ID", minioUsername)
t.Setenv("AWS_SECRET_ACCESS_KEY", minioPassword)
conf.Bucket = MinioBucketURL(bucketName, endpoint)
Expand Down Expand Up @@ -132,7 +132,7 @@ func mkAddFn(t *testing.T, bucket *blob.Bucket) internal.MutateStoreFn {
func mkStore(t *testing.T, dir string) (*Store, *blob.Bucket) {
t.Helper()

endpoint := startMinio(context.Background(), t, bucketName)
endpoint := StartMinio(context.Background(), t, bucketName)
conf := mkConf(t, dir, bucketName, endpoint)
bucket, err := newBucket(context.Background(), conf)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/blob/tests.go
Expand Up @@ -85,7 +85,7 @@ func newMinioBucket(ctx context.Context, t *testing.T, prefix string) *blob.Buck
ctx, cancelFunc := context.WithDeadline(ctx, deadline)
defer cancelFunc()

endpoint := startMinio(ctx, t, bucketName)
endpoint := StartMinio(ctx, t, bucketName)

param := UploadParam{
BucketURL: MinioBucketURL(bucketName, endpoint),
Expand Down Expand Up @@ -140,7 +140,7 @@ func uploadDirToBucket(tb testing.TB, ctx context.Context, dir string, bucket *b
return files, err
}

func startMinio(ctx context.Context, t *testing.T, bucketName string) string {
func StartMinio(ctx context.Context, t *testing.T, bucketName string) string {
t.Helper()
is := require.New(t)
pool, err := dockertest.NewPool("")
Expand Down
74 changes: 74 additions & 0 deletions internal/storage/overlay/conf.go
@@ -0,0 +1,74 @@
// Copyright 2021-2023 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0

package overlay

import (
"errors"
"time"

"github.com/cerbos/cerbos/internal/config"
"github.com/cerbos/cerbos/internal/storage"
"go.uber.org/multierr"
)

const (
confKey = storage.ConfKey + ".overlay"
defaultFallbackErrorThreshold = 5
defaultFallbackErrorWindow = 5 * time.Minute
)

// Conf is required (if driver is set to 'overlay') configuration for overlay storage driver.
// +desc=This section is required only if storage.driver is overlay.
type Conf struct {
// BaseDriver is the default storage driver
BaseDriver string `yaml:"baseDriver" conf:"required,example=blob"`
// FallbackDriver is the secondary or fallback storage driver
FallbackDriver string `yaml:"fallbackDriver" conf:"required,example=disk"`
// FallbackErrorThreshold is the max number of errors we allow within the fallbackErrorWindow period
FallbackErrorThreshold int `yaml:"fallbackErrorThreshold,omitempty" conf:",example=5"`
// FallbackErrorWindow is the cyclic period within which we aggregate failures
FallbackErrorWindow time.Duration `yaml:"fallbackErrorWindow" conf:",example=5m"`
}

func (conf *Conf) Key() string {
return confKey
}

func (conf *Conf) Validate() error {
var errs []error

if conf.BaseDriver == "" {
errs = append(errs, errors.New("baseDriver is required"))
}

if conf.FallbackDriver == "" {
errs = append(errs, errors.New("fallbackDriver is required"))
}

if conf.BaseDriver != "" && conf.BaseDriver == conf.FallbackDriver {
errs = append(errs, errors.New("baseDriver and fallbackDriver cannot be the same"))
}

if len(errs) > 0 {
return multierr.Combine(errs...)
}

return nil
}

func (conf *Conf) SetDefaults() {
if conf.FallbackErrorThreshold == 0 {
conf.FallbackErrorThreshold = defaultFallbackErrorThreshold
}
if conf.FallbackErrorWindow == 0 {
conf.FallbackErrorWindow = defaultFallbackErrorWindow
}
}

func GetConf() (*Conf, error) {
conf := &Conf{}
err := config.GetSection(conf)

return conf, err
}
19 changes: 19 additions & 0 deletions internal/storage/overlay/overlay.go
@@ -0,0 +1,19 @@
// Copyright 2021-2023 Zenauth Ltd.
// SPDX-License-Identifier: Apache-2.0

package overlay

import (
"context"

"github.com/cerbos/cerbos/internal/engine"
"github.com/cerbos/cerbos/internal/schema"
)

// The interface is defined here because placing in storage causes a circular dependency,
// probably because it blurs the lines by implementing `SourceStore` whilst having a dependency on
// `schema` in order to build the compile managers in the GetOverlayPolicyLoader method.
type Overlay interface {
// GetOverlayPolicyLoader returns a PolicyLoader implementation that wraps two SourceStores
GetOverlayPolicyLoader(ctx context.Context, schemaMgr schema.Manager) (engine.PolicyLoader, error)
}

0 comments on commit d0b3f79

Please sign in to comment.