Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent/cache: Store leases in-order in persistent cache so that restore respects dependencies #12843

Merged
merged 11 commits into from Oct 27, 2021
3 changes: 3 additions & 0 deletions changelog/12843.txt
@@ -0,0 +1,3 @@
```release-note:improvement
agent/cache: Process persistent cache leases in dependency order during restore to ensure child leases are always correctly restored
```
230 changes: 176 additions & 54 deletions command/agent/cache/cacheboltdb/bolt.go
Expand Up @@ -2,6 +2,7 @@ package cacheboltdb

import (
"context"
"encoding/binary"
"fmt"
"os"
"path/filepath"
Expand All @@ -17,7 +18,7 @@ import (
const (
// Keep track of schema version for future migrations
storageVersionKey = "version"
storageVersion = "1"
storageVersion = "2" // v2 merges auth-lease and secret-lease buckets into one ordered bucket

// DatabaseFileName - filename for the persistent cache file
DatabaseFileName = "vault-agent-cache.db"
Expand All @@ -26,15 +27,29 @@ const (
// bootstrapping keys
metaBucketName = "meta"

// SecretLeaseType - Bucket/type for leases with secret info
SecretLeaseType = "secret-lease"
// DEPRECATED: secretLeaseType - v1 Bucket/type for leases with secret info
secretLeaseType = "secret-lease"

// AuthLeaseType - Bucket/type for leases with auth info
AuthLeaseType = "auth-lease"
// DEPRECATED: authLeaseType - v1 Bucket/type for leases with auth info
authLeaseType = "auth-lease"

// TokenType - Bucket/type for auto-auth tokens
TokenType = "token"

// LeaseType - v2 Bucket/type for auth AND secret leases.
//
// This bucket stores keys in the same order they were created using
// auto-incrementing keys and the fact that BoltDB stores keys in byte
// slice order. This means when we iterate through this bucket during
// restore, we will always restore parent tokens before their children,
// allowing us to correctly attach child contexts to their parent's context.
LeaseType = "lease"

// lookupType - v2 Bucket/type to map from a memcachedb index ID to an
// auto-incrementing BoltDB key. Facilitates deletes from the lease
// bucket using an ID instead of the auto-incrementing BoltDB key.
lookupType = "lookup"
tomhjp marked this conversation as resolved.
Show resolved Hide resolved

// AutoAuthToken - key for the latest auto-auth token
AutoAuthToken = "auto-auth-token"

Expand Down Expand Up @@ -71,7 +86,7 @@ func NewBoltStorage(config *BoltStorageConfig) (*BoltStorage, error) {
return nil, err
}
err = db.Update(func(tx *bolt.Tx) error {
return createBoltSchema(tx)
return createBoltSchema(tx, storageVersion)
})
if err != nil {
return nil, err
Expand All @@ -85,39 +100,130 @@ func NewBoltStorage(config *BoltStorageConfig) (*BoltStorage, error) {
return bs, nil
}

func createBoltSchema(tx *bolt.Tx) error {
// create the meta bucket at the top level
func createBoltSchema(tx *bolt.Tx, createVersion string) error {
switch {
case createVersion == "1":
if err := createV1BoltSchema(tx); err != nil {
return err
}
case createVersion == "2":
if err := createV2BoltSchema(tx); err != nil {
return err
}
default:
return fmt.Errorf("schema version %s not supported", createVersion)
}

meta, err := tx.CreateBucketIfNotExists([]byte(metaBucketName))
if err != nil {
return fmt.Errorf("failed to create bucket %s: %w", metaBucketName, err)
}
// check and set file version in the meta bucket

// Check and set file version in the meta bucket.
version := meta.Get([]byte(storageVersionKey))
switch {
case version == nil:
err = meta.Put([]byte(storageVersionKey), []byte(storageVersion))
err = meta.Put([]byte(storageVersionKey), []byte(createVersion))
if err != nil {
return fmt.Errorf("failed to set storage version: %w", err)
}
case string(version) != storageVersion:
return fmt.Errorf("storage migration from %s to %s not implemented", string(version), storageVersion)

return nil

case string(version) == createVersion:
return nil

case string(version) == "1" && createVersion == "2":
return migrateFromV1ToV2Schema(tx)

default:
return fmt.Errorf("storage migration from %s to %s not implemented", string(version), createVersion)
}
}

func createV1BoltSchema(tx *bolt.Tx) error {
// Create the buckets for tokens and leases.
for _, bucket := range []string{TokenType, authLeaseType, secretLeaseType} {
if _, err := tx.CreateBucketIfNotExists([]byte(bucket)); err != nil {
return fmt.Errorf("failed to create %s bucket: %w", bucket, err)
}
}

return nil
}

func createV2BoltSchema(tx *bolt.Tx) error {
// Create the buckets for tokens and leases.
for _, bucket := range []string{TokenType, LeaseType, lookupType} {
if _, err := tx.CreateBucketIfNotExists([]byte(bucket)); err != nil {
return fmt.Errorf("failed to create %s bucket: %w", bucket, err)
}
}

return nil
}

func migrateFromV1ToV2Schema(tx *bolt.Tx) error {
if err := createV2BoltSchema(tx); err != nil {
return err
}

// create the buckets for tokens and leases
_, err = tx.CreateBucketIfNotExists([]byte(TokenType))
for _, v1BucketType := range []string{authLeaseType, secretLeaseType} {
if bucket := tx.Bucket([]byte(v1BucketType)); bucket != nil {
bucket.ForEach(func(key, value []byte) error {
autoIncKey, err := autoIncrementedLeaseKey(tx, string(key))
if err != nil {
return fmt.Errorf("error migrating %s %q key to auto incremented key: %w", v1BucketType, string(key), err)
tomhjp marked this conversation as resolved.
Show resolved Hide resolved
}
if err := tx.Bucket([]byte(LeaseType)).Put(autoIncKey, value); err != nil {
return fmt.Errorf("error migrating %s %q from v1 to v2 schema: %w", v1BucketType, string(key), err)
}
return nil
})

if err := tx.DeleteBucket([]byte(v1BucketType)); err != nil {
return fmt.Errorf("failed to clean up %s bucket during v1 to v2 schema migration: %w", v1BucketType, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we need to do the migration once more? Could we end up in a state where a migration will never succeed? If so, perhaps it would make sense to scrap the db and start fresh.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the migration fails, something super weird has happened because migrations should be very rare (we only explicitly support persistent stores in k8s today) and failures should also be very rare because the schema creation/migration code is very lenient. So I'd prefer that an operator gets a chance to clean up/debug before we wipe everything automatically.

}
}
}

meta, err := tx.CreateBucketIfNotExists([]byte(metaBucketName))
if err != nil {
return fmt.Errorf("failed to create token bucket: %w", err)
return fmt.Errorf("failed to create meta bucket: %w", err)
}
_, err = tx.CreateBucketIfNotExists([]byte(AuthLeaseType))
if err := meta.Put([]byte(storageVersionKey), []byte(storageVersion)); err != nil {
return fmt.Errorf("failed to update schema from v1 to v2: %w", err)
}

return nil
}

func autoIncrementedLeaseKey(tx *bolt.Tx, id string) ([]byte, error) {
leaseBucket := tx.Bucket([]byte(LeaseType))
keyValue, err := leaseBucket.NextSequence()
if err != nil {
return fmt.Errorf("failed to create auth lease bucket: %w", err)
return nil, fmt.Errorf("failed to generate lookup key for id %q: %w", id, err)
}
_, err = tx.CreateBucketIfNotExists([]byte(SecretLeaseType))

key := make([]byte, 8)
// MUST be big endian, because keys are ordered by byte slice comparison
// which progressively compares each byte in the slice starting at index 0.
// BigEndian in the range [255-257] looks like this:
// [0 0 0 0 0 0 0 255]
// [0 0 0 0 0 0 1 0]
// [0 0 0 0 0 0 1 1]
// LittleEndian in the same range looks like this:
// [255 0 0 0 0 0 0 0]
// [0 1 0 0 0 0 0 0]
// [1 1 0 0 0 0 0 0]
binary.BigEndian.PutUint64(key, keyValue)

err = tx.Bucket([]byte(lookupType)).Put([]byte(id), key)
if err != nil {
return fmt.Errorf("failed to create secret lease bucket: %w", err)
return nil, err
}

return nil
return key, nil
}

// Set an index (token or lease) in bolt storage
Expand All @@ -133,44 +239,56 @@ func (b *BoltStorage) Set(ctx context.Context, id string, plaintext []byte, inde
}

return b.db.Update(func(tx *bolt.Tx) error {
s := tx.Bucket([]byte(indexType))
if s == nil {
return fmt.Errorf("bucket %q not found", indexType)
}
// If this is an auto-auth token, also stash it in the meta bucket for
// easy retrieval upon restore
if indexType == TokenType {
var key []byte
switch indexType {
case LeaseType:
// If this is a lease type, generate an auto-incrementing key and
// store an ID -> key lookup entry
key, err = autoIncrementedLeaseKey(tx, id)
if err != nil {
return err
}
case TokenType:
// If this is an auto-auth token, also stash it in the meta bucket for
// easy retrieval upon restore
key = []byte(id)
meta := tx.Bucket([]byte(metaBucketName))
if err := meta.Put([]byte(AutoAuthToken), protoBlob); err != nil {
return fmt.Errorf("failed to set latest auto-auth token: %w", err)
}
default:
return fmt.Errorf("called Set for unsupported type %q", indexType)
}
return s.Put([]byte(id), protoBlob)
})
}

func getBucketIDs(b *bolt.Bucket) ([][]byte, error) {
ids := [][]byte{}
err := b.ForEach(func(k, v []byte) error {
ids = append(ids, k)
return nil
s := tx.Bucket([]byte(indexType))
if s == nil {
return fmt.Errorf("bucket %q not found", indexType)
}
return s.Put(key, protoBlob)
})
return ids, err
}

// Delete an index (token or lease) by id from bolt storage
func (b *BoltStorage) Delete(id string) error {
// Delete an index (token or lease) by key from bolt storage
func (b *BoltStorage) Delete(id string, indexType string) error {
return b.db.Update(func(tx *bolt.Tx) error {
// Since Delete returns a nil error if the key doesn't exist, just call
// delete in all three index buckets without checking existence first
if err := tx.Bucket([]byte(TokenType)).Delete([]byte(id)); err != nil {
return fmt.Errorf("failed to delete %q from token bucket: %w", id, err)
key := []byte(id)
if indexType == LeaseType {
key = tx.Bucket([]byte(lookupType)).Get(key)
if key == nil {
return fmt.Errorf("failed to lookup bolt DB key for id %q", id)
}

err := tx.Bucket([]byte(lookupType)).Delete([]byte(id))
if err != nil {
return fmt.Errorf("failed to delete %q from lookup bucket: %w", id, err)
}
}
if err := tx.Bucket([]byte(AuthLeaseType)).Delete([]byte(id)); err != nil {
return fmt.Errorf("failed to delete %q from auth lease bucket: %w", id, err)

bucket := tx.Bucket([]byte(indexType))
if bucket == nil {
return fmt.Errorf("bucket %q not found during delete", indexType)
}
if err := tx.Bucket([]byte(SecretLeaseType)).Delete([]byte(id)); err != nil {
return fmt.Errorf("failed to delete %q from secret lease bucket: %w", id, err)
if err := bucket.Delete(key); err != nil {
return fmt.Errorf("failed to delete %q from %q bucket: %w", id, indexType, err)
}
b.logger.Trace("deleted index from bolt db", "id", id)
return nil
Expand All @@ -193,10 +311,14 @@ func (b *BoltStorage) GetByType(ctx context.Context, indexType string) ([][]byte
err := b.db.View(func(tx *bolt.Tx) error {
var errors *multierror.Error

tx.Bucket([]byte(indexType)).ForEach(func(id, ciphertext []byte) error {
bucket := tx.Bucket([]byte(indexType))
if bucket == nil {
return fmt.Errorf("bucket %q not found", indexType)
}
bucket.ForEach(func(key, ciphertext []byte) error {
plaintext, err := b.decrypt(ctx, ciphertext)
if err != nil {
errors = multierror.Append(errors, fmt.Errorf("error decrypting index id %s: %w", id, err))
errors = multierror.Append(errors, fmt.Errorf("error decrypting entry %s: %w", key, err))
return nil
}

Expand Down Expand Up @@ -247,11 +369,11 @@ func (b *BoltStorage) GetRetrievalToken() ([]byte, error) {
var token []byte

err := b.db.View(func(tx *bolt.Tx) error {
keyBucket := tx.Bucket([]byte(metaBucketName))
if keyBucket == nil {
metaBucket := tx.Bucket([]byte(metaBucketName))
if metaBucket == nil {
return fmt.Errorf("bucket %q not found", metaBucketName)
}
value := keyBucket.Get([]byte(RetrievalTokenMaterial))
value := metaBucket.Get([]byte(RetrievalTokenMaterial))
if value != nil {
token = make([]byte, len(value))
copy(token, value)
Expand Down Expand Up @@ -286,13 +408,13 @@ func (b *BoltStorage) Close() error {
// the schema/layout
func (b *BoltStorage) Clear() error {
return b.db.Update(func(tx *bolt.Tx) error {
for _, name := range []string{AuthLeaseType, SecretLeaseType, TokenType} {
for _, name := range []string{TokenType, LeaseType, lookupType} {
b.logger.Trace("deleting bolt bucket", "name", name)
if err := tx.DeleteBucket([]byte(name)); err != nil {
return err
}
}
return createBoltSchema(tx)
return createBoltSchema(tx, storageVersion)
})
}

Expand Down