Skip to content

Commit

Permalink
api: support shared replication state storage
Browse files Browse the repository at this point in the history
- The SharedReplicationStateStore provides cluster replication state
  storage that is easily integrated in the Client's response/request
  callback chains.
  • Loading branch information
benashz committed Oct 13, 2021
1 parent df0089c commit 46fe88f
Show file tree
Hide file tree
Showing 2 changed files with 498 additions and 30 deletions.
163 changes: 135 additions & 28 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
retryablehttp "github.com/hashicorp/go-retryablehttp"
rootcerts "github.com/hashicorp/go-rootcerts"
"github.com/hashicorp/go-secure-stdlib/parseutil"
"golang.org/x/net/http2"
"golang.org/x/time/rate"

"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/strutil"
"github.com/hashicorp/vault/sdk/logical"
"golang.org/x/net/http2"
"golang.org/x/time/rate"
)

const (
Expand All @@ -49,6 +50,7 @@ const (
EnvVaultMFA = "VAULT_MFA"
EnvRateLimit = "VAULT_RATE_LIMIT"
EnvHTTPProxy = "VAULT_HTTP_PROXY"
HeaderIndex = "X-Vault-Index"
)

// Deprecated values
Expand Down Expand Up @@ -135,6 +137,10 @@ type Config struct {

// CloneHeaders ensures that the source client's headers are copied to its clone.
CloneHeaders bool

// CloneReplicationStateStore ensures that the source client's ReplicationStateStore
// is registered in the clone.
CloneReplicationStateStore bool
}

// TLSConfig contains the parameters needed to configure TLS on the HTTP client
Expand Down Expand Up @@ -415,16 +421,17 @@ func parseRateLimit(val string) (rate float64, burst int, err error) {

// Client is the client to the Vault API. Create a client with NewClient.
type Client struct {
modifyLock sync.RWMutex
addr *url.URL
config *Config
token string
headers http.Header
wrappingLookupFunc WrappingLookupFunc
mfaCreds []string
policyOverride bool
requestCallbacks []RequestCallback
responseCallbacks []ResponseCallback
modifyLock sync.RWMutex
addr *url.URL
config *Config
token string
headers http.Header
wrappingLookupFunc WrappingLookupFunc
mfaCreds []string
policyOverride bool
requestCallbacks []RequestCallback
responseCallbacks []ResponseCallback
replicationStateStore ReplicationStateStore
}

// NewClient returns a new client for the given configuration.
Expand Down Expand Up @@ -512,6 +519,31 @@ func NewClient(c *Config) (*Client, error) {
return client, nil
}

// RegisterReplicationStateStore for tracking replication states across all requests and responses.
// The ReplicationStateStore will be registered in both the request and response callback registries.
func (c *Client) RegisterReplicationStateStore(store ReplicationStateStore) error {
if c.replicationStateStore != nil {
return fmt.Errorf("replication state store already registered")
}

c.modifyLock.Lock()
defer c.modifyLock.Unlock()

c.replicationStateStore = store

if len(c.requestCallbacks) == 0 {
c.requestCallbacks = []RequestCallback{}
}
c.requestCallbacks = append(c.requestCallbacks, c.replicationStateStore.HandleRequest)

if len(c.responseCallbacks) == 0 {
c.responseCallbacks = []ResponseCallback{}
}
c.responseCallbacks = append(c.responseCallbacks, c.replicationStateStore.HandleResponse)

return nil
}

func (c *Client) CloneConfig() *Config {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
Expand All @@ -530,6 +562,7 @@ func (c *Client) CloneConfig() *Config {
newConfig.OutputCurlString = c.config.OutputCurlString
newConfig.SRVLookup = c.config.SRVLookup
newConfig.CloneHeaders = c.config.CloneHeaders
newConfig.CloneReplicationStateStore = c.config.CloneReplicationStateStore

// we specifically want a _copy_ of the client here, not a pointer to the original one
newClient := *c.config.HttpClient
Expand Down Expand Up @@ -855,6 +888,26 @@ func (c *Client) CloneHeaders() bool {
return c.config.CloneHeaders
}

// SetCloneReplicationStateStore to clone the client's ReplicationStateStore
func (c *Client) SetCloneReplicationStateStore(val bool) {
c.modifyLock.Lock()
defer c.modifyLock.Unlock()
c.config.modifyLock.Lock()
defer c.config.modifyLock.Unlock()

c.config.CloneReplicationStateStore = val
}

// CloneReplicationStateStore gets the configured value.
func (c *Client) CloneReplicationStateStore() bool {
c.modifyLock.RLock()
defer c.modifyLock.RUnlock()
c.config.modifyLock.RLock()
defer c.config.modifyLock.RUnlock()

return c.config.CloneReplicationStateStore
}

// Clone creates a new client with the same configuration. Note that the same
// underlying http.Client is used; modifying the client from more than one
// goroutine at once may not be safe, so modify the client as needed and then
Expand All @@ -872,20 +925,21 @@ func (c *Client) Clone() (*Client, error) {
defer config.modifyLock.RUnlock()

newConfig := &Config{
Address: config.Address,
HttpClient: config.HttpClient,
MinRetryWait: config.MinRetryWait,
MaxRetryWait: config.MaxRetryWait,
MaxRetries: config.MaxRetries,
Timeout: config.Timeout,
Backoff: config.Backoff,
CheckRetry: config.CheckRetry,
Logger: config.Logger,
Limiter: config.Limiter,
OutputCurlString: config.OutputCurlString,
AgentAddress: config.AgentAddress,
SRVLookup: config.SRVLookup,
CloneHeaders: config.CloneHeaders,
Address: config.Address,
HttpClient: config.HttpClient,
MinRetryWait: config.MinRetryWait,
MaxRetryWait: config.MaxRetryWait,
MaxRetries: config.MaxRetries,
Timeout: config.Timeout,
Backoff: config.Backoff,
CheckRetry: config.CheckRetry,
Logger: config.Logger,
Limiter: config.Limiter,
OutputCurlString: config.OutputCurlString,
AgentAddress: config.AgentAddress,
SRVLookup: config.SRVLookup,
CloneHeaders: config.CloneHeaders,
CloneReplicationStateStore: config.CloneReplicationStateStore,
}
client, err := NewClient(newConfig)
if err != nil {
Expand All @@ -896,6 +950,16 @@ func (c *Client) Clone() (*Client, error) {
client.SetHeaders(c.Headers().Clone())
}

if config.CloneReplicationStateStore {
if c.replicationStateStore != nil {
if err := client.RegisterReplicationStateStore(c.replicationStateStore); err != nil {
return nil, err
}
} else {
c.config.Logger.Warn("Parent has no ReplicationStateStore and CloneReplicationStateStore is specified")
}
}

return client, nil
}

Expand Down Expand Up @@ -1152,7 +1216,7 @@ func (c *Client) WithResponseCallbacks(callbacks ...ResponseCallback) *Client {
// by Vault in a response header.
func RecordState(state *string) ResponseCallback {
return func(resp *Response) {
*state = resp.Header.Get("X-Vault-Index")
*state = resp.Header.Get(HeaderIndex)
}
}

Expand All @@ -1162,7 +1226,7 @@ func RecordState(state *string) ResponseCallback {
func RequireState(states ...string) RequestCallback {
return func(req *Request) {
for _, s := range states {
req.Headers.Add("X-Vault-Index", s)
req.Headers.Add(HeaderIndex, s)
}
}
}
Expand Down Expand Up @@ -1300,3 +1364,46 @@ func DefaultRetryPolicy(ctx context.Context, resp *http.Response, err error) (bo
}
return false, nil
}

type ReplicationStateStore interface {
HandleResponse(resp *Response)
HandleRequest(resp *Request)
States() []string
}

// SharedReplicationStateStore stores replication states by providing
// ResponseCallback and RequestCallback methods.
// It can be used when Client Controlled Consistency (VLT-146) is required.
// These methods should be registered in the Client's corresponding callback chains.
type SharedReplicationStateStore struct {
m sync.RWMutex
states []string
}

// HandleResponse updates the store's replication states with the merger of all states.
// It should be registered in a Client's requestCallback chain.
func (w *SharedReplicationStateStore) HandleResponse(resp *Response) {
w.m.Lock()
defer w.m.Unlock()
newState := resp.Header.Get(HeaderIndex)
if newState != "" {
w.states = MergeReplicationStates(w.states, newState)
}
}

// HandleRequest updates the request with the store's replication states.
// It should be registered in a Client's responseCallback chain.
func (w *SharedReplicationStateStore) HandleRequest(req *Request) {
w.m.RLock()
defer w.m.RUnlock()
for _, s := range w.states {
req.Headers.Add(HeaderIndex, s)
}
}

// States currently known to the store.
func (w *SharedReplicationStateStore) States() []string {
w.m.Lock()
defer w.m.Unlock()
return w.states
}

0 comments on commit 46fe88f

Please sign in to comment.