Skip to content

Commit

Permalink
enhancement: Clean-up store resources (#1749)
Browse files Browse the repository at this point in the history
* enhancement: Clean-up store resources

The new version of tar-fs does not have to read the entire archive into
memory. However, it requires the tar file handle to stay open for the
lifetime of the FS. Because the index did not hold closable resources
until now, this PR adds the capability to clean-up resources held by the
index. It also fixes a few places where resources held by stores were not
getting cleaned up properly.

Signed-off-by: Charith Ellawala <charith@cerbos.dev>

* Fix lint warnings

Signed-off-by: Charith Ellawala <charith@cerbos.dev>

---------

Signed-off-by: Charith Ellawala <charith@cerbos.dev>
  • Loading branch information
charithe committed Aug 15, 2023
1 parent 5b25738 commit e2c7af0
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 15 deletions.
1 change: 1 addition & 0 deletions cmd/cerbos/compile/compile.go
Expand Up @@ -96,6 +96,7 @@ func (c *Cmd) Run(k *kong.Kong) error {
}

store := disk.NewFromIndexWithConf(idx, &disk.Conf{})
defer store.Close()

enforcement := internalschema.EnforcementReject
if c.IgnoreSchemas {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -50,7 +50,7 @@ require (
github.com/lestrrat-go/jwx/v2 v2.0.12
github.com/mattn/go-isatty v0.0.19
github.com/minio/minio-go/v7 v7.0.61
github.com/nlepage/go-tarfs v1.1.0
github.com/nlepage/go-tarfs v1.2.0
github.com/oklog/ulid/v2 v2.1.0
github.com/olekukonko/tablewriter v0.0.5
github.com/ory/dockertest/v3 v3.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -650,8 +650,8 @@ github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nlepage/go-tarfs v1.1.0 h1:bsACOiZMB/zFjYG/sE01070i9Fl26MnRpw0L6WuyfVs=
github.com/nlepage/go-tarfs v1.1.0/go.mod h1:IhxRcLhLkawBetnwu/JNuoPkq/6cclAllhgEa6SmzS8=
github.com/nlepage/go-tarfs v1.2.0 h1:UDFlDHRCjTjvUxMpZ6K2JzDwj6O3gPZto/eQYDcsSbQ=
github.com/nlepage/go-tarfs v1.2.0/go.mod h1:rno18mpMy9aEH1IiJVftFsqPyIpwqSUiAOpJYjlV2NA=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
Expand Down
4 changes: 4 additions & 0 deletions internal/storage/bundle/bundle.go
Expand Up @@ -244,5 +244,9 @@ func (b *Bundle) LoadSchema(_ context.Context, path string) (io.ReadCloser, erro
}

func (b *Bundle) Release() error {
return b.Close()
}

func (b *Bundle) Close() error {
return b.cleanup()
}
13 changes: 13 additions & 0 deletions internal/storage/bundle/remote_source.go
Expand Up @@ -460,3 +460,16 @@ func (s *RemoteSource) Reload(ctx context.Context) error {
func (s *RemoteSource) SourceKind() string {
return "remote"
}

func (s *RemoteSource) Close() error {
s.mu.Lock()
defer s.mu.Unlock()

if s.bundle != nil {
err := s.bundle.Close()
s.bundle = nil
return err
}

return nil
}
10 changes: 10 additions & 0 deletions internal/storage/bundle/remote_source_test.go
Expand Up @@ -32,6 +32,7 @@ func TestRemoteSource(t *testing.T) {

rs, err := bundle.NewRemoteSource(mkConf(t, true))
require.NoError(t, err, "Failed to create remote source")
t.Cleanup(func() { _ = rs.Close() })
require.NoError(t, rs.InitWithClient(context.Background(), mockClient), "Failed to init")

ids, err := rs.ListPolicyIDs(context.Background(), storage.ListPolicyIDsParams{IncludeDisabled: true})
Expand All @@ -46,6 +47,7 @@ func TestRemoteSource(t *testing.T) {

rs, err := bundle.NewRemoteSource(mkConf(t, true))
require.NoError(t, err, "Failed to create remote source")
t.Cleanup(func() { _ = rs.Close() })
require.NoError(t, rs.InitWithClient(context.Background(), mockClient), "Failed to init")

ids, err := rs.ListPolicyIDs(context.Background(), storage.ListPolicyIDsParams{IncludeDisabled: true})
Expand All @@ -60,6 +62,7 @@ func TestRemoteSource(t *testing.T) {

rs, err := bundle.NewRemoteSource(mkConf(t, true))
require.NoError(t, err, "Failed to create remote source")
t.Cleanup(func() { _ = rs.Close() })
require.Error(t, rs.InitWithClient(context.Background(), mockClient), "Expected error")

require.False(t, rs.IsHealthy(), "Source should be unhealthy")
Expand All @@ -75,6 +78,7 @@ func TestRemoteSource(t *testing.T) {

rs, err := bundle.NewRemoteSource(mkConf(t, true))
require.NoError(t, err, "Failed to create remote source")
t.Cleanup(func() { _ = rs.Close() })
require.NoError(t, rs.InitWithClient(context.Background(), mockClient), "Failed to init")

require.NoError(t, rs.Reload(context.Background()), "Failed to reload")
Expand All @@ -93,6 +97,7 @@ func TestRemoteSource(t *testing.T) {

rs, err := bundle.NewRemoteSource(mkConf(t, false))
require.NoError(t, err, "Failed to create remote source")
t.Cleanup(func() { _ = rs.Close() })
require.NoError(t, rs.InitWithClient(context.Background(), mockClient), "Failed to init")

ids, err := rs.ListPolicyIDs(context.Background(), storage.ListPolicyIDsParams{IncludeDisabled: true})
Expand All @@ -114,6 +119,7 @@ func TestRemoteSource(t *testing.T) {
Once()

rs, err := bundle.NewRemoteSource(mkConf(t, false))
t.Cleanup(func() { _ = rs.Close() })
require.NoError(t, err, "Failed to create remote source")

ctx, cancelFn := context.WithCancel(context.Background())
Expand Down Expand Up @@ -152,6 +158,7 @@ func TestRemoteSource(t *testing.T) {

rs, err := bundle.NewRemoteSource(mkConf(t, false))
require.NoError(t, err, "Failed to create remote source")
t.Cleanup(func() { _ = rs.Close() })

ctx, cancelFn := context.WithCancel(context.Background())
t.Cleanup(cancelFn)
Expand Down Expand Up @@ -192,6 +199,7 @@ func TestRemoteSource(t *testing.T) {

rs, err := bundle.NewRemoteSource(mkConf(t, false))
require.NoError(t, err, "Failed to create remote source")
t.Cleanup(func() { _ = rs.Close() })

ctx, cancelFn := context.WithCancel(context.Background())
t.Cleanup(cancelFn)
Expand Down Expand Up @@ -237,6 +245,7 @@ func TestRemoteSource(t *testing.T) {

rs, err := bundle.NewRemoteSource(mkConf(t, false))
require.NoError(t, err, "Failed to create remote source")
t.Cleanup(func() { _ = rs.Close() })

ctx, cancelFn := context.WithCancel(context.Background())
t.Cleanup(cancelFn)
Expand Down Expand Up @@ -278,6 +287,7 @@ func TestRemoteSource(t *testing.T) {

rs, err := bundle.NewRemoteSource(mkConf(t, false))
require.NoError(t, err, "Failed to create remote source")
t.Cleanup(func() { _ = rs.Close() })

ctx, cancelFn := context.WithCancel(context.Background())
t.Cleanup(cancelFn)
Expand Down
16 changes: 15 additions & 1 deletion internal/storage/bundle/store.go
Expand Up @@ -9,11 +9,13 @@ import (
"fmt"
"io"

"go.uber.org/multierr"
"go.uber.org/zap"

runtimev1 "github.com/cerbos/cerbos/api/genpb/cerbos/runtime/v1"
"github.com/cerbos/cerbos/internal/config"
"github.com/cerbos/cerbos/internal/namer"
"github.com/cerbos/cerbos/internal/storage"
"go.uber.org/zap"
)

const DriverName = "bundle"
Expand Down Expand Up @@ -123,3 +125,15 @@ func (hs *HybridStore) GetFirstMatch(ctx context.Context, candidates []namer.Mod
func (hs *HybridStore) SourceKind() string {
return "hybrid"
}

func (hs *HybridStore) Close() (outErr error) {
if c, ok := hs.remote.(io.Closer); ok {
outErr = multierr.Append(outErr, c.Close())
}

if c, ok := hs.local.(io.Closer); ok {
outErr = multierr.Append(outErr, c.Close())
}

return outErr
}
4 changes: 4 additions & 0 deletions internal/storage/disk/disk.go
Expand Up @@ -132,3 +132,7 @@ func (s *Store) Reload(ctx context.Context) error {

return nil
}

func (s *Store) Close() error {
return s.idx.Close()
}
1 change: 1 addition & 0 deletions internal/storage/disk/disk_test.go
Expand Up @@ -30,6 +30,7 @@ func mkStore(t *testing.T, dir string) *Store {

store, err := NewStore(context.Background(), &Conf{Directory: dir})
require.NoError(t, err)
t.Cleanup(func() { _ = store.Close() })

return store
}
Expand Down
4 changes: 4 additions & 0 deletions internal/storage/index/builder.go
Expand Up @@ -6,6 +6,7 @@ package index
import (
"context"
"fmt"
"io"
"io/fs"
"path"

Expand Down Expand Up @@ -137,6 +138,9 @@ func build(ctx context.Context, fsys fs.FS, opts buildOptions) (Index, error) {
return nil
})
if err != nil {
if c, ok := fsys.(io.Closer); ok {
_ = c.Close()
}
return nil, err
}

Expand Down
8 changes: 8 additions & 0 deletions internal/storage/index/index.go
Expand Up @@ -40,6 +40,7 @@ type Entry struct {
}

type Index interface {
io.Closer
storage.Instrumented
GetFirstMatch([]namer.ModuleID) (*policy.CompilationUnit, error)
GetCompilationUnits(...namer.ModuleID) (map[namer.ModuleID]*policy.CompilationUnit, error)
Expand Down Expand Up @@ -517,3 +518,10 @@ func (idx *index) Reload(ctx context.Context) ([]storage.Event, error) {

return []storage.Event{storage.NewReloadEvent()}, nil
}

func (idx *index) Close() error {
if c, ok := idx.fsys.(io.Closer); ok {
return c.Close()
}
return nil
}
1 change: 1 addition & 0 deletions internal/storage/index/index_test.go
Expand Up @@ -50,6 +50,7 @@ func TestIndexLoadPolicy(t *testing.T) {
require.NoError(t, err)
idx, err := index.Build(context.Background(), fsys)
require.NoError(t, err)
t.Cleanup(func() { _ = idx.Close() })

t.Run("should load the policies", func(t *testing.T) {
policies, err := idx.LoadPolicy(context.Background(), policyFiles...)
Expand Down
21 changes: 18 additions & 3 deletions internal/storage/overlay/store.go
Expand Up @@ -9,17 +9,20 @@ import (
"fmt"
"io"

runtimev1 "github.com/cerbos/cerbos/api/genpb/cerbos/runtime/v1"
"go.uber.org/multierr"
"go.uber.org/zap"

runtimev1 "github.com/cerbos/cerbos/api/genpb/cerbos/runtime/v1"

"github.com/sony/gobreaker"
"github.com/sourcegraph/conc/pool"

"github.com/cerbos/cerbos/internal/compile"
"github.com/cerbos/cerbos/internal/config"
"github.com/cerbos/cerbos/internal/engine"
"github.com/cerbos/cerbos/internal/namer"
"github.com/cerbos/cerbos/internal/schema"
"github.com/cerbos/cerbos/internal/storage"
"github.com/sony/gobreaker"
"github.com/sourcegraph/conc/pool"
)

const DriverName = "overlay"
Expand Down Expand Up @@ -219,3 +222,15 @@ func (s *Store) Reload(ctx context.Context) error {

return p.Wait()
}

func (s *Store) Close() (outErr error) {
if c, ok := s.baseStore.(io.Closer); ok {
outErr = multierr.Append(outErr, c.Close())
}

if c, ok := s.fallbackStore.(io.Closer); ok {
outErr = multierr.Append(outErr, c.Close())
}

return outErr
}
41 changes: 41 additions & 0 deletions internal/test/mocks/Index.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 25 additions & 8 deletions internal/util/filesystem.go
Expand Up @@ -15,6 +15,7 @@ import (
"strings"

"github.com/nlepage/go-tarfs"
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -82,12 +83,30 @@ func IsArchiveFile(fileName string) bool {
return IsZip(fileName) || IsTar(fileName) || IsGzip(fileName)
}

func getFsFromTar(r io.Reader) (fs.FS, error) {
func getFsFromTar(r io.Reader, closers ...io.Closer) (fs.FS, error) {
tfs, err := tarfs.New(r)
if err != nil {
for _, c := range closers {
_ = c.Close()
}
return nil, fmt.Errorf("failed to open tar file: %w", err)
}
return tfs, nil

return ClosableFS{FS: tfs, closers: closers}, nil
}

type ClosableFS struct {
fs.FS
io.Closer
closers []io.Closer
}

func (cfs ClosableFS) Close() (outErr error) {
for _, c := range cfs.closers {
outErr = multierr.Append(outErr, c.Close())
}

return outErr
}

// OpenDirectoryFS attempts to open a directory FS at the given location. It'll initially check if the target file is an archive,
Expand All @@ -101,29 +120,27 @@ func OpenDirectoryFS(path string) (fs.FS, error) {
if err != nil {
return nil, fmt.Errorf("failed to open zip file: %w", err)
}
return zr, nil
return ClosableFS{FS: zr, closers: []io.Closer{zr}}, nil
case IsTar(path):
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open tar file: %w", err)
}
defer f.Close()

return getFsFromTar(f)
return getFsFromTar(f, f)
case IsGzip(path):
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open gzip file: %w", err)
}
defer f.Close()

gzr, err := gzip.NewReader(f)
if err != nil {
_ = f.Close()
return nil, fmt.Errorf("failed to open gzip file: %w", err)
}
defer gzr.Close()

return getFsFromTar(gzr)
return getFsFromTar(gzr, gzr, f)
}

return os.DirFS(path), nil
Expand Down

0 comments on commit e2c7af0

Please sign in to comment.