From 186fb82c72b59254def34139bfafa0416aa1a5ea Mon Sep 17 00:00:00 2001 From: Ben Ash Date: Tue, 12 Oct 2021 21:38:06 -0400 Subject: [PATCH 1/8] Update go version to allow for running api tests. - rebase off of main --- api/go.mod | 2 +- api/go.sum | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/api/go.mod b/api/go.mod index b0924e260a60e..f7f9b51e66471 100644 --- a/api/go.mod +++ b/api/go.mod @@ -1,6 +1,6 @@ module github.com/hashicorp/vault/api -go 1.13 +go 1.16 replace github.com/hashicorp/vault/sdk => ../sdk diff --git a/api/go.sum b/api/go.sum index d5e55d2f58bae..c79dea5c1bb7d 100644 --- a/api/go.sum +++ b/api/go.sum @@ -311,7 +311,6 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/square/go-jose.v2 v2.5.1 h1:7odma5RETjNHWJnR32wx8t+Io4djHE1PqxCFx3iiZ2w= gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= From 2453c532a267af57f393762b24980161ccf2b1cf Mon Sep 17 00:00:00 2001 From: Ben Ash Date: Tue, 12 Oct 2021 21:48:29 -0400 Subject: [PATCH 2/8] api: support shared replication state storage - The SharedReplicationStateStore provides cluster replication state storage that is easily integrated in the Client's response/request callback chains. --- api/client.go | 163 ++++++++++++++++---- api/client_test.go | 365 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 498 insertions(+), 30 deletions(-) diff --git a/api/client.go b/api/client.go index 34974d742674a..2a53db835135b 100644 --- a/api/client.go +++ b/api/client.go @@ -24,11 +24,12 @@ import ( retryablehttp "github.com/hashicorp/go-retryablehttp" rootcerts "github.com/hashicorp/go-rootcerts" "github.com/hashicorp/go-secure-stdlib/parseutil" + "golang.org/x/net/http2" + "golang.org/x/time/rate" + "github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/sdk/helper/strutil" "github.com/hashicorp/vault/sdk/logical" - "golang.org/x/net/http2" - "golang.org/x/time/rate" ) const ( @@ -49,6 +50,7 @@ const ( EnvVaultMFA = "VAULT_MFA" EnvRateLimit = "VAULT_RATE_LIMIT" EnvHTTPProxy = "VAULT_HTTP_PROXY" + HeaderIndex = "X-Vault-Index" ) // Deprecated values @@ -135,6 +137,10 @@ type Config struct { // CloneHeaders ensures that the source client's headers are copied to its clone. CloneHeaders bool + + // CloneReplicationStateStore ensures that the source client's ReplicationStateStore + // is registered in the clone. + CloneReplicationStateStore bool } // TLSConfig contains the parameters needed to configure TLS on the HTTP client @@ -415,16 +421,17 @@ func parseRateLimit(val string) (rate float64, burst int, err error) { // Client is the client to the Vault API. Create a client with NewClient. type Client struct { - modifyLock sync.RWMutex - addr *url.URL - config *Config - token string - headers http.Header - wrappingLookupFunc WrappingLookupFunc - mfaCreds []string - policyOverride bool - requestCallbacks []RequestCallback - responseCallbacks []ResponseCallback + modifyLock sync.RWMutex + addr *url.URL + config *Config + token string + headers http.Header + wrappingLookupFunc WrappingLookupFunc + mfaCreds []string + policyOverride bool + requestCallbacks []RequestCallback + responseCallbacks []ResponseCallback + replicationStateStore ReplicationStateStore } // NewClient returns a new client for the given configuration. @@ -512,6 +519,31 @@ func NewClient(c *Config) (*Client, error) { return client, nil } +// RegisterReplicationStateStore for tracking replication states across all requests and responses. +// The ReplicationStateStore will be registered in both the request and response callback registries. +func (c *Client) RegisterReplicationStateStore(store ReplicationStateStore) error { + if c.replicationStateStore != nil { + return fmt.Errorf("replication state store already registered") + } + + c.modifyLock.Lock() + defer c.modifyLock.Unlock() + + c.replicationStateStore = store + + if len(c.requestCallbacks) == 0 { + c.requestCallbacks = []RequestCallback{} + } + c.requestCallbacks = append(c.requestCallbacks, c.replicationStateStore.HandleRequest) + + if len(c.responseCallbacks) == 0 { + c.responseCallbacks = []ResponseCallback{} + } + c.responseCallbacks = append(c.responseCallbacks, c.replicationStateStore.HandleResponse) + + return nil +} + func (c *Client) CloneConfig() *Config { c.modifyLock.RLock() defer c.modifyLock.RUnlock() @@ -530,6 +562,7 @@ func (c *Client) CloneConfig() *Config { newConfig.OutputCurlString = c.config.OutputCurlString newConfig.SRVLookup = c.config.SRVLookup newConfig.CloneHeaders = c.config.CloneHeaders + newConfig.CloneReplicationStateStore = c.config.CloneReplicationStateStore // we specifically want a _copy_ of the client here, not a pointer to the original one newClient := *c.config.HttpClient @@ -855,6 +888,26 @@ func (c *Client) CloneHeaders() bool { return c.config.CloneHeaders } +// SetCloneReplicationStateStore to clone the client's ReplicationStateStore +func (c *Client) SetCloneReplicationStateStore(val bool) { + c.modifyLock.Lock() + defer c.modifyLock.Unlock() + c.config.modifyLock.Lock() + defer c.config.modifyLock.Unlock() + + c.config.CloneReplicationStateStore = val +} + +// CloneReplicationStateStore gets the configured value. +func (c *Client) CloneReplicationStateStore() bool { + c.modifyLock.RLock() + defer c.modifyLock.RUnlock() + c.config.modifyLock.RLock() + defer c.config.modifyLock.RUnlock() + + return c.config.CloneReplicationStateStore +} + // Clone creates a new client with the same configuration. Note that the same // underlying http.Client is used; modifying the client from more than one // goroutine at once may not be safe, so modify the client as needed and then @@ -872,20 +925,21 @@ func (c *Client) Clone() (*Client, error) { defer config.modifyLock.RUnlock() newConfig := &Config{ - Address: config.Address, - HttpClient: config.HttpClient, - MinRetryWait: config.MinRetryWait, - MaxRetryWait: config.MaxRetryWait, - MaxRetries: config.MaxRetries, - Timeout: config.Timeout, - Backoff: config.Backoff, - CheckRetry: config.CheckRetry, - Logger: config.Logger, - Limiter: config.Limiter, - OutputCurlString: config.OutputCurlString, - AgentAddress: config.AgentAddress, - SRVLookup: config.SRVLookup, - CloneHeaders: config.CloneHeaders, + Address: config.Address, + HttpClient: config.HttpClient, + MinRetryWait: config.MinRetryWait, + MaxRetryWait: config.MaxRetryWait, + MaxRetries: config.MaxRetries, + Timeout: config.Timeout, + Backoff: config.Backoff, + CheckRetry: config.CheckRetry, + Logger: config.Logger, + Limiter: config.Limiter, + OutputCurlString: config.OutputCurlString, + AgentAddress: config.AgentAddress, + SRVLookup: config.SRVLookup, + CloneHeaders: config.CloneHeaders, + CloneReplicationStateStore: config.CloneReplicationStateStore, } client, err := NewClient(newConfig) if err != nil { @@ -896,6 +950,16 @@ func (c *Client) Clone() (*Client, error) { client.SetHeaders(c.Headers().Clone()) } + if config.CloneReplicationStateStore { + if c.replicationStateStore != nil { + if err := client.RegisterReplicationStateStore(c.replicationStateStore); err != nil { + return nil, err + } + } else { + c.config.Logger.Warn("Parent has no ReplicationStateStore and CloneReplicationStateStore is specified") + } + } + return client, nil } @@ -1152,7 +1216,7 @@ func (c *Client) WithResponseCallbacks(callbacks ...ResponseCallback) *Client { // by Vault in a response header. func RecordState(state *string) ResponseCallback { return func(resp *Response) { - *state = resp.Header.Get("X-Vault-Index") + *state = resp.Header.Get(HeaderIndex) } } @@ -1162,7 +1226,7 @@ func RecordState(state *string) ResponseCallback { func RequireState(states ...string) RequestCallback { return func(req *Request) { for _, s := range states { - req.Headers.Add("X-Vault-Index", s) + req.Headers.Add(HeaderIndex, s) } } } @@ -1300,3 +1364,46 @@ func DefaultRetryPolicy(ctx context.Context, resp *http.Response, err error) (bo } return false, nil } + +type ReplicationStateStore interface { + HandleResponse(resp *Response) + HandleRequest(resp *Request) + States() []string +} + +// SharedReplicationStateStore stores replication states by providing +// ResponseCallback and RequestCallback methods. +// It can be used when Client Controlled Consistency (VLT-146) is required. +// These methods should be registered in the Client's corresponding callback chains. +type SharedReplicationStateStore struct { + m sync.RWMutex + states []string +} + +// HandleResponse updates the store's replication states with the merger of all states. +// It should be registered in a Client's requestCallback chain. +func (w *SharedReplicationStateStore) HandleResponse(resp *Response) { + w.m.Lock() + defer w.m.Unlock() + newState := resp.Header.Get(HeaderIndex) + if newState != "" { + w.states = MergeReplicationStates(w.states, newState) + } +} + +// HandleRequest updates the request with the store's replication states. +// It should be registered in a Client's responseCallback chain. +func (w *SharedReplicationStateStore) HandleRequest(req *Request) { + w.m.RLock() + defer w.m.RUnlock() + for _, s := range w.states { + req.Headers.Add(HeaderIndex, s) + } +} + +// States currently known to the store. +func (w *SharedReplicationStateStore) States() []string { + w.m.Lock() + defer w.m.Unlock() + return w.states +} diff --git a/api/client_test.go b/api/client_test.go index 3b306a9275dbd..696c2f536dfe3 100644 --- a/api/client_test.go +++ b/api/client_test.go @@ -11,12 +11,15 @@ import ( "net/url" "os" "reflect" + "sort" "strings" + "sync" "testing" "time" "github.com/go-test/deep" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/vault/sdk/helper/consts" ) @@ -412,8 +415,7 @@ func TestClientNonTransportRoundTripper(t *testing.T) { } func TestClone(t *testing.T) { - type fields struct { - } + type fields struct{} tests := []struct { name string config *Config @@ -646,3 +648,362 @@ func TestMergeReplicationStates(t *testing.T) { }) } } + +func TestSharedReplicationStateStore_HandleResponse(t *testing.T) { + b64enc := func(s string) string { + return base64.StdEncoding.EncodeToString([]byte(s)) + } + + tests := []struct { + name string + expected []string + resp []*Response + }{ + { + name: "single", + resp: []*Response{ + { + Response: &http.Response{ + Header: map[string][]string{ + HeaderIndex: { + b64enc("v1:cid:1:0:"), + }, + }, + }, + }, + }, + expected: []string{ + b64enc("v1:cid:1:0:"), + }, + }, + { + name: "empty", + resp: []*Response{ + { + Response: &http.Response{ + Header: map[string][]string{}, + }, + }, + }, + expected: nil, + }, + { + name: "multiple", + resp: []*Response{ + { + Response: &http.Response{ + Header: map[string][]string{ + HeaderIndex: { + b64enc("v1:cid:0:1:"), + }, + }, + }, + }, + { + Response: &http.Response{ + Header: map[string][]string{ + HeaderIndex: { + b64enc("v1:cid:1:0:"), + }, + }, + }, + }, + }, + expected: []string{ + b64enc("v1:cid:0:1:"), + b64enc("v1:cid:1:0:"), + }, + }, + { + name: "duplicates", + resp: []*Response{ + { + Response: &http.Response{ + Header: map[string][]string{ + HeaderIndex: { + b64enc("v1:cid:1:0:"), + }, + }, + }, + }, + { + Response: &http.Response{ + Header: map[string][]string{ + HeaderIndex: { + b64enc("v1:cid:1:0:"), + }, + }, + }, + }, + }, + expected: []string{ + b64enc("v1:cid:1:0:"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &SharedReplicationStateStore{} + + var wg sync.WaitGroup + for _, r := range tt.resp { + wg.Add(1) + go func(r *Response) { + defer wg.Done() + w.HandleResponse(r) + }(r) + } + wg.Wait() + + if !reflect.DeepEqual(tt.expected, w.states) { + t.Errorf("HandleResponse(): expected states %v, actual %v", tt.expected, w.states) + } + }) + } +} + +func TestSharedReplicationStateStore_HandleRequest(t *testing.T) { + tests := []struct { + name string + states []string + req []*Request + expected []string + }{ + { + name: "empty", + states: []string{}, + req: []*Request{ + { + Headers: make(http.Header), + }, + }, + expected: nil, + }, + { + name: "basic", + states: []string{ + "v1:cid:0:1:", + "v1:cid:1:0:", + }, + req: []*Request{ + { + Headers: make(http.Header), + }, + }, + expected: []string{ + "v1:cid:0:1:", + "v1:cid:1:0:", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + store := &SharedReplicationStateStore{ + states: tt.states, + } + + start := make(chan interface{}) + done := make(chan interface{}) + + for _, r := range tt.req { + go func(r *Request) { + <-start + store.HandleRequest(r) + done <- true + }(r) + } + + close(start) + + for i := 0; i < len(tt.req); i++ { + <-done + } + + var actual []string + for _, r := range tt.req { + if values := r.Headers.Values(HeaderIndex); len(values) > 0 { + actual = append(actual, values...) + } + } + sort.Strings(actual) + if !reflect.DeepEqual(tt.expected, actual) { + t.Errorf("HandleRequest(): expected states %v, actual %v", tt.expected, actual) + } + }) + } +} + +func TestClient_RegisterReplicationStateStore(t *testing.T) { + b64enc := func(s string) string { + return base64.StdEncoding.EncodeToString([]byte(s)) + } + + handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Header().Set(HeaderIndex, strings.TrimLeft(req.URL.Path, "/")) + }) + + tests := []struct { + name string + handler http.Handler + wantStates []string + values [][]string + clone bool + }{ + { + name: "multiple_duplicates", + clone: false, + handler: handler, + wantStates: []string{ + b64enc("v1:cid:0:4:"), + }, + values: [][]string{ + { + b64enc("v1:cid:0:4:"), + b64enc("v1:cid:0:2:"), + }, + { + b64enc("v1:cid:0:4:"), + b64enc("v1:cid:0:2:"), + }, + }, + }, + { + name: "basic_clone", + clone: true, + handler: handler, + wantStates: []string{ + b64enc("v1:cid:0:4:"), + }, + values: [][]string{ + { + b64enc("v1:cid:0:4:"), + }, + { + b64enc("v1:cid:0:3:"), + }, + }, + }, + { + name: "multiple_clone", + clone: true, + handler: handler, + wantStates: []string{ + b64enc("v1:cid:0:4:"), + }, + values: [][]string{ + { + b64enc("v1:cid:0:4:"), + b64enc("v1:cid:0:2:"), + }, + { + b64enc("v1:cid:0:3:"), + b64enc("v1:cid:0:1:"), + }, + }, + }, + { + name: "multiple_duplicates_clone", + clone: true, + handler: handler, + wantStates: []string{ + b64enc("v1:cid:0:4:"), + }, + values: [][]string{ + { + b64enc("v1:cid:0:4:"), + b64enc("v1:cid:0:2:"), + }, + { + b64enc("v1:cid:0:4:"), + b64enc("v1:cid:0:2:"), + }, + }, + }, + { + name: "multiple_duplicates", + clone: false, + handler: handler, + wantStates: []string{ + b64enc("v1:cid:0:4:"), + }, + values: [][]string{ + { + b64enc("v1:cid:0:4:"), + b64enc("v1:cid:0:2:"), + }, + { + b64enc("v1:cid:0:4:"), + b64enc("v1:cid:0:2:"), + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testRequest := func(client *Client, val string) { + req := client.NewRequest("GET", "/"+val) + req.Headers.Set(HeaderIndex, val) + resp, err := client.RawRequestWithContext(context.Background(), req) + if err != nil { + t.Fatal(err) + } + + // validate that the server provided a valid header value in its response + actual := resp.Header.Get(HeaderIndex) + if actual != val { + t.Errorf("expected header value %v, actual %v", val, actual) + } + } + + config, ln := testHTTPServer(t, handler) + defer ln.Close() + + config.Address = fmt.Sprintf("http://%s", ln.Addr()) + parent, err := NewClient(config) + if err != nil { + t.Fatal(err) + } + + parent.SetCloneReplicationStateStore(true) + if err := parent.RegisterReplicationStateStore(&SharedReplicationStateStore{}); err != nil { + t.Fatal(err) + } + + start := make(chan interface{}) + done := make(chan interface{}) + + var n int + for i := 0; i < len(tt.values); i++ { + var c *Client + if tt.clone { + c, err = parent.Clone() + if err != nil { + t.Fatal(err) + } + } else { + c = parent + } + + for _, val := range tt.values[i] { + n += 1 + go func(val string) { + <-start + testRequest(c, val) + done <- true + }(val) + } + } + + close(start) + for i := 0; i < n; i++ { + <-done + } + + if !reflect.DeepEqual(tt.wantStates, parent.replicationStateStore.States()) { + t.Errorf("expected states %v, actual %v", tt.wantStates, parent.replicationStateStore.States()) + } + }) + } +} From 70f7d1a6651f93ed1ed4dd11b062d7db62b8f1c0 Mon Sep 17 00:00:00 2001 From: Ben Ash Date: Wed, 13 Oct 2021 12:57:33 -0400 Subject: [PATCH 3/8] Post review updates - no longer rely in response/request callbacks for updating the replication state store - make setting up the store simpler by adding a configuration directive - address other comments. - revert go version changes go.mod --- api/client.go | 146 +++++++++++++++++++------------------------- api/client_test.go | 50 +++++---------- api/go.mod | 2 +- api/go.sum | 1 + changelog/12814.txt | 0 5 files changed, 78 insertions(+), 121 deletions(-) create mode 100644 changelog/12814.txt diff --git a/api/client.go b/api/client.go index 2a53db835135b..42fd6a695bfd3 100644 --- a/api/client.go +++ b/api/client.go @@ -138,9 +138,10 @@ type Config struct { // CloneHeaders ensures that the source client's headers are copied to its clone. CloneHeaders bool - // CloneReplicationStateStore ensures that the source client's ReplicationStateStore - // is registered in the clone. - CloneReplicationStateStore bool + // PreventStaleReads enables the Client to require discovered cluster replication states + // in every request. + // The shared state is automatically propagated to all Client clones. + PreventStaleReads bool } // TLSConfig contains the parameters needed to configure TLS on the HTTP client @@ -431,7 +432,7 @@ type Client struct { policyOverride bool requestCallbacks []RequestCallback responseCallbacks []ResponseCallback - replicationStateStore ReplicationStateStore + replicationStateStore *replicationStateStore } // NewClient returns a new client for the given configuration. @@ -505,6 +506,10 @@ func NewClient(c *Config) (*Client, error) { headers: make(http.Header), } + if c.PreventStaleReads { + client.replicationStateStore = &replicationStateStore{} + } + // Add the VaultRequest SSRF protection header client.headers[consts.RequestHeaderName] = []string{"true"} @@ -519,31 +524,6 @@ func NewClient(c *Config) (*Client, error) { return client, nil } -// RegisterReplicationStateStore for tracking replication states across all requests and responses. -// The ReplicationStateStore will be registered in both the request and response callback registries. -func (c *Client) RegisterReplicationStateStore(store ReplicationStateStore) error { - if c.replicationStateStore != nil { - return fmt.Errorf("replication state store already registered") - } - - c.modifyLock.Lock() - defer c.modifyLock.Unlock() - - c.replicationStateStore = store - - if len(c.requestCallbacks) == 0 { - c.requestCallbacks = []RequestCallback{} - } - c.requestCallbacks = append(c.requestCallbacks, c.replicationStateStore.HandleRequest) - - if len(c.responseCallbacks) == 0 { - c.responseCallbacks = []ResponseCallback{} - } - c.responseCallbacks = append(c.responseCallbacks, c.replicationStateStore.HandleResponse) - - return nil -} - func (c *Client) CloneConfig() *Config { c.modifyLock.RLock() defer c.modifyLock.RUnlock() @@ -562,7 +542,7 @@ func (c *Client) CloneConfig() *Config { newConfig.OutputCurlString = c.config.OutputCurlString newConfig.SRVLookup = c.config.SRVLookup newConfig.CloneHeaders = c.config.CloneHeaders - newConfig.CloneReplicationStateStore = c.config.CloneReplicationStateStore + newConfig.PreventStaleReads = c.config.PreventStaleReads // we specifically want a _copy_ of the client here, not a pointer to the original one newClient := *c.config.HttpClient @@ -888,24 +868,30 @@ func (c *Client) CloneHeaders() bool { return c.config.CloneHeaders } -// SetCloneReplicationStateStore to clone the client's ReplicationStateStore -func (c *Client) SetCloneReplicationStateStore(val bool) { +// SetPreventStaleReads to prevent reading stale cluster replication state. +func (c *Client) SetPreventStaleReads(preventStaleReads bool) { c.modifyLock.Lock() defer c.modifyLock.Unlock() c.config.modifyLock.Lock() defer c.config.modifyLock.Unlock() - c.config.CloneReplicationStateStore = val + if preventStaleReads && c.replicationStateStore == nil { + c.replicationStateStore = &replicationStateStore{} + } else { + c.replicationStateStore = nil + } + + c.config.PreventStaleReads = preventStaleReads } -// CloneReplicationStateStore gets the configured value. -func (c *Client) CloneReplicationStateStore() bool { +// PreventStaleReads gets the configured value of PreventStaleReads +func (c *Client) PreventStaleReads() bool { c.modifyLock.RLock() defer c.modifyLock.RUnlock() c.config.modifyLock.RLock() defer c.config.modifyLock.RUnlock() - return c.config.CloneReplicationStateStore + return c.config.PreventStaleReads } // Clone creates a new client with the same configuration. Note that the same @@ -925,21 +911,21 @@ func (c *Client) Clone() (*Client, error) { defer config.modifyLock.RUnlock() newConfig := &Config{ - Address: config.Address, - HttpClient: config.HttpClient, - MinRetryWait: config.MinRetryWait, - MaxRetryWait: config.MaxRetryWait, - MaxRetries: config.MaxRetries, - Timeout: config.Timeout, - Backoff: config.Backoff, - CheckRetry: config.CheckRetry, - Logger: config.Logger, - Limiter: config.Limiter, - OutputCurlString: config.OutputCurlString, - AgentAddress: config.AgentAddress, - SRVLookup: config.SRVLookup, - CloneHeaders: config.CloneHeaders, - CloneReplicationStateStore: config.CloneReplicationStateStore, + Address: config.Address, + HttpClient: config.HttpClient, + MinRetryWait: config.MinRetryWait, + MaxRetryWait: config.MaxRetryWait, + MaxRetries: config.MaxRetries, + Timeout: config.Timeout, + Backoff: config.Backoff, + CheckRetry: config.CheckRetry, + Logger: config.Logger, + Limiter: config.Limiter, + OutputCurlString: config.OutputCurlString, + AgentAddress: config.AgentAddress, + SRVLookup: config.SRVLookup, + CloneHeaders: config.CloneHeaders, + PreventStaleReads: config.PreventStaleReads, } client, err := NewClient(newConfig) if err != nil { @@ -950,15 +936,7 @@ func (c *Client) Clone() (*Client, error) { client.SetHeaders(c.Headers().Clone()) } - if config.CloneReplicationStateStore { - if c.replicationStateStore != nil { - if err := client.RegisterReplicationStateStore(c.replicationStateStore); err != nil { - return nil, err - } - } else { - c.config.Logger.Warn("Parent has no ReplicationStateStore and CloneReplicationStateStore is specified") - } - } + client.SetPreventStaleReads(config.PreventStaleReads) return client, nil } @@ -1065,6 +1043,10 @@ func (c *Client) RawRequestWithContext(ctx context.Context, r *Request) (*Respon cb(r) } + if c.config.PreventStaleReads { + c.replicationStateStore.requireStates(r) + } + if limiter != nil { limiter.Wait(ctx) } @@ -1175,6 +1157,10 @@ START: for _, cb := range c.responseCallbacks { cb(result) } + + if c.config.PreventStaleReads { + c.replicationStateStore.recordStates(result) + } } if err := result.Error(); err != nil { return result, err @@ -1365,45 +1351,37 @@ func DefaultRetryPolicy(ctx context.Context, resp *http.Response, err error) (bo return false, nil } -type ReplicationStateStore interface { - HandleResponse(resp *Response) - HandleRequest(resp *Request) - States() []string -} - -// SharedReplicationStateStore stores replication states by providing -// ResponseCallback and RequestCallback methods. -// It can be used when Client Controlled Consistency (VLT-146) is required. -// These methods should be registered in the Client's corresponding callback chains. -type SharedReplicationStateStore struct { - m sync.RWMutex - states []string +// replicationStateStore is used to track cluster replication states +// in order to prevent stale reads. +type replicationStateStore struct { + m sync.RWMutex + store []string } -// HandleResponse updates the store's replication states with the merger of all states. -// It should be registered in a Client's requestCallback chain. -func (w *SharedReplicationStateStore) HandleResponse(resp *Response) { +// recordStates updates the store's replication states with the merger of all states. +func (w *replicationStateStore) recordStates(resp *Response) { w.m.Lock() defer w.m.Unlock() newState := resp.Header.Get(HeaderIndex) if newState != "" { - w.states = MergeReplicationStates(w.states, newState) + w.store = MergeReplicationStates(w.store, newState) } } -// HandleRequest updates the request with the store's replication states. -// It should be registered in a Client's responseCallback chain. -func (w *SharedReplicationStateStore) HandleRequest(req *Request) { +// requireStates updates the Request with the store's current replication states. +func (w *replicationStateStore) requireStates(req *Request) { w.m.RLock() defer w.m.RUnlock() - for _, s := range w.states { + for _, s := range w.store { req.Headers.Add(HeaderIndex, s) } } -// States currently known to the store. -func (w *SharedReplicationStateStore) States() []string { +// states currently stored. +func (w *replicationStateStore) states() []string { w.m.Lock() defer w.m.Unlock() - return w.states + c := make([]string, len(w.store)) + copy(c, w.store) + return c } diff --git a/api/client_test.go b/api/client_test.go index 696c2f536dfe3..d67f79483c3c5 100644 --- a/api/client_test.go +++ b/api/client_test.go @@ -649,7 +649,7 @@ func TestMergeReplicationStates(t *testing.T) { } } -func TestSharedReplicationStateStore_HandleResponse(t *testing.T) { +func TestReplicationStateStore_recordState(t *testing.T) { b64enc := func(s string) string { return base64.StdEncoding.EncodeToString([]byte(s)) } @@ -744,26 +744,26 @@ func TestSharedReplicationStateStore_HandleResponse(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := &SharedReplicationStateStore{} + w := &replicationStateStore{} var wg sync.WaitGroup for _, r := range tt.resp { wg.Add(1) go func(r *Response) { defer wg.Done() - w.HandleResponse(r) + w.recordStates(r) }(r) } wg.Wait() - if !reflect.DeepEqual(tt.expected, w.states) { - t.Errorf("HandleResponse(): expected states %v, actual %v", tt.expected, w.states) + if !reflect.DeepEqual(tt.expected, w.store) { + t.Errorf("recordStates(): expected states %v, actual %v", tt.expected, w.store) } }) } } -func TestSharedReplicationStateStore_HandleRequest(t *testing.T) { +func TestReplicationStateStore_requireStates(t *testing.T) { tests := []struct { name string states []string @@ -799,8 +799,8 @@ func TestSharedReplicationStateStore_HandleRequest(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - store := &SharedReplicationStateStore{ - states: tt.states, + store := &replicationStateStore{ + store: tt.states, } start := make(chan interface{}) @@ -809,7 +809,7 @@ func TestSharedReplicationStateStore_HandleRequest(t *testing.T) { for _, r := range tt.req { go func(r *Request) { <-start - store.HandleRequest(r) + store.requireStates(r) done <- true }(r) } @@ -828,13 +828,13 @@ func TestSharedReplicationStateStore_HandleRequest(t *testing.T) { } sort.Strings(actual) if !reflect.DeepEqual(tt.expected, actual) { - t.Errorf("HandleRequest(): expected states %v, actual %v", tt.expected, actual) + t.Errorf("requireStates(): expected states %v, actual %v", tt.expected, actual) } }) } } -func TestClient_RegisterReplicationStateStore(t *testing.T) { +func TestClient_PreventDirtyReads(t *testing.T) { b64enc := func(s string) string { return base64.StdEncoding.EncodeToString([]byte(s)) } @@ -920,24 +920,6 @@ func TestClient_RegisterReplicationStateStore(t *testing.T) { }, }, }, - { - name: "multiple_duplicates", - clone: false, - handler: handler, - wantStates: []string{ - b64enc("v1:cid:0:4:"), - }, - values: [][]string{ - { - b64enc("v1:cid:0:4:"), - b64enc("v1:cid:0:2:"), - }, - { - b64enc("v1:cid:0:4:"), - b64enc("v1:cid:0:2:"), - }, - }, - }, } for _, tt := range tests { @@ -960,17 +942,13 @@ func TestClient_RegisterReplicationStateStore(t *testing.T) { config, ln := testHTTPServer(t, handler) defer ln.Close() + config.PreventStaleReads = true config.Address = fmt.Sprintf("http://%s", ln.Addr()) parent, err := NewClient(config) if err != nil { t.Fatal(err) } - parent.SetCloneReplicationStateStore(true) - if err := parent.RegisterReplicationStateStore(&SharedReplicationStateStore{}); err != nil { - t.Fatal(err) - } - start := make(chan interface{}) done := make(chan interface{}) @@ -1001,8 +979,8 @@ func TestClient_RegisterReplicationStateStore(t *testing.T) { <-done } - if !reflect.DeepEqual(tt.wantStates, parent.replicationStateStore.States()) { - t.Errorf("expected states %v, actual %v", tt.wantStates, parent.replicationStateStore.States()) + if !reflect.DeepEqual(tt.wantStates, parent.replicationStateStore.states()) { + t.Errorf("expected states %v, actual %v", tt.wantStates, parent.replicationStateStore.states()) } }) } diff --git a/api/go.mod b/api/go.mod index f7f9b51e66471..b0924e260a60e 100644 --- a/api/go.mod +++ b/api/go.mod @@ -1,6 +1,6 @@ module github.com/hashicorp/vault/api -go 1.16 +go 1.13 replace github.com/hashicorp/vault/sdk => ../sdk diff --git a/api/go.sum b/api/go.sum index c79dea5c1bb7d..d5e55d2f58bae 100644 --- a/api/go.sum +++ b/api/go.sum @@ -311,6 +311,7 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/square/go-jose.v2 v2.5.1 h1:7odma5RETjNHWJnR32wx8t+Io4djHE1PqxCFx3iiZ2w= gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= diff --git a/changelog/12814.txt b/changelog/12814.txt new file mode 100644 index 0000000000000..e69de29bb2d1d From 26762cf7c219f78ff9c3baaba5cf551f2d4b3f64 Mon Sep 17 00:00:00 2001 From: Ben Ash Date: Wed, 13 Oct 2021 14:30:03 -0400 Subject: [PATCH 4/8] Update PreventStaleRead docs, add changelog --- api/client.go | 5 ++++- changelog/12814.txt | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/api/client.go b/api/client.go index 42fd6a695bfd3..1189c420eb946 100644 --- a/api/client.go +++ b/api/client.go @@ -139,8 +139,11 @@ type Config struct { CloneHeaders bool // PreventStaleReads enables the Client to require discovered cluster replication states - // in every request. + // for each request. // The shared state is automatically propagated to all Client clones. + // + // Note: Careful consideration should be made prior to enabling this setting since there will + // be a performance penalty paid upon each request. This feature requires Enterprise server-side. PreventStaleReads bool } diff --git a/changelog/12814.txt b/changelog/12814.txt index e69de29bb2d1d..15b55631c851e 100644 --- a/changelog/12814.txt +++ b/changelog/12814.txt @@ -0,0 +1,3 @@ +```release-note:improvement +api: Add support for preventing stale reads by automatically requiring known cluster replication states in each request. +``` From d03f0188f99df30d58de7c5349a55c9d0810ada5 Mon Sep 17 00:00:00 2001 From: Ben Ash Date: Wed, 13 Oct 2021 15:08:10 -0400 Subject: [PATCH 5/8] fix regression when cloning --- api/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/client.go b/api/client.go index 1189c420eb946..2b5789602ad2f 100644 --- a/api/client.go +++ b/api/client.go @@ -939,7 +939,7 @@ func (c *Client) Clone() (*Client, error) { client.SetHeaders(c.Headers().Clone()) } - client.SetPreventStaleReads(config.PreventStaleReads) + client.replicationStateStore = c.replicationStateStore return client, nil } From d72bcdbf99675e8a4d0ae7035d06d2152b519a87 Mon Sep 17 00:00:00 2001 From: Ben Ash Date: Wed, 13 Oct 2021 16:04:33 -0400 Subject: [PATCH 6/8] Extend TestClone for PreventStaleRead - replace the use of channels with a WaitGroup --- api/client.go | 17 ++++++++------- api/client_test.go | 52 +++++++++++++++++++++++----------------------- 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/api/client.go b/api/client.go index 2b5789602ad2f..770d0c5730998 100644 --- a/api/client.go +++ b/api/client.go @@ -1047,7 +1047,7 @@ func (c *Client) RawRequestWithContext(ctx context.Context, r *Request) (*Respon } if c.config.PreventStaleReads { - c.replicationStateStore.requireStates(r) + c.replicationStateStore.requireState(r) } if limiter != nil { @@ -1162,7 +1162,7 @@ START: } if c.config.PreventStaleReads { - c.replicationStateStore.recordStates(result) + c.replicationStateStore.recordState(result) } } if err := result.Error(); err != nil { @@ -1361,8 +1361,9 @@ type replicationStateStore struct { store []string } -// recordStates updates the store's replication states with the merger of all states. -func (w *replicationStateStore) recordStates(resp *Response) { +// recordState updates the store's replication states with the merger of all +// states. +func (w *replicationStateStore) recordState(resp *Response) { w.m.Lock() defer w.m.Unlock() newState := resp.Header.Get(HeaderIndex) @@ -1371,8 +1372,8 @@ func (w *replicationStateStore) recordStates(resp *Response) { } } -// requireStates updates the Request with the store's current replication states. -func (w *replicationStateStore) requireStates(req *Request) { +// requireState updates the Request with the store's current replication states. +func (w *replicationStateStore) requireState(req *Request) { w.m.RLock() defer w.m.RUnlock() for _, s := range w.store { @@ -1382,8 +1383,8 @@ func (w *replicationStateStore) requireStates(req *Request) { // states currently stored. func (w *replicationStateStore) states() []string { - w.m.Lock() - defer w.m.Unlock() + w.m.RLock() + defer w.m.RUnlock() c := make([]string, len(w.store)) copy(c, w.store) return c diff --git a/api/client_test.go b/api/client_test.go index d67f79483c3c5..b5111c677760d 100644 --- a/api/client_test.go +++ b/api/client_test.go @@ -435,6 +435,12 @@ func TestClone(t *testing.T) { "X-baz": []string{"qux"}, }, }, + { + name: "preventStaleReads", + config: &Config{ + PreventStaleReads: true, + }, + }, } for _, tt := range tests { @@ -514,6 +520,12 @@ func TestClone(t *testing.T) { } } } + if tt.config.PreventStaleReads && client1.replicationStateStore == nil { + t.Fatalf("replicationStateStore is nil") + } + if !reflect.DeepEqual(client1.replicationStateStore, client2.replicationStateStore) { + t.Fatalf("expected replicationStateStore %v, actual %v", client1.replicationStateStore, client2.replicationStateStore) + } }) } } @@ -751,19 +763,19 @@ func TestReplicationStateStore_recordState(t *testing.T) { wg.Add(1) go func(r *Response) { defer wg.Done() - w.recordStates(r) + w.recordState(r) }(r) } wg.Wait() if !reflect.DeepEqual(tt.expected, w.store) { - t.Errorf("recordStates(): expected states %v, actual %v", tt.expected, w.store) + t.Errorf("recordState(): expected states %v, actual %v", tt.expected, w.store) } }) } } -func TestReplicationStateStore_requireStates(t *testing.T) { +func TestReplicationStateStore_requireState(t *testing.T) { tests := []struct { name string states []string @@ -797,28 +809,23 @@ func TestReplicationStateStore_requireStates(t *testing.T) { }, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { store := &replicationStateStore{ store: tt.states, } - start := make(chan interface{}) - done := make(chan interface{}) - + var wg sync.WaitGroup for _, r := range tt.req { + wg.Add(1) go func(r *Request) { - <-start - store.requireStates(r) - done <- true + defer wg.Done() + store.requireState(r) }(r) } - close(start) - - for i := 0; i < len(tt.req); i++ { - <-done - } + wg.Wait() var actual []string for _, r := range tt.req { @@ -828,7 +835,7 @@ func TestReplicationStateStore_requireStates(t *testing.T) { } sort.Strings(actual) if !reflect.DeepEqual(tt.expected, actual) { - t.Errorf("requireStates(): expected states %v, actual %v", tt.expected, actual) + t.Errorf("requireState(): expected states %v, actual %v", tt.expected, actual) } }) } @@ -949,10 +956,7 @@ func TestClient_PreventDirtyReads(t *testing.T) { t.Fatal(err) } - start := make(chan interface{}) - done := make(chan interface{}) - - var n int + var wg sync.WaitGroup for i := 0; i < len(tt.values); i++ { var c *Client if tt.clone { @@ -965,19 +969,15 @@ func TestClient_PreventDirtyReads(t *testing.T) { } for _, val := range tt.values[i] { - n += 1 + wg.Add(1) go func(val string) { - <-start + defer wg.Done() testRequest(c, val) - done <- true }(val) } } - close(start) - for i := 0; i < n; i++ { - <-done - } + wg.Wait() if !reflect.DeepEqual(tt.wantStates, parent.replicationStateStore.states()) { t.Errorf("expected states %v, actual %v", tt.wantStates, parent.replicationStateStore.states()) From 40859d16b1484920efb8cde339278f32622052cc Mon Sep 17 00:00:00 2001 From: Ben Ash Date: Wed, 13 Oct 2021 16:13:40 -0400 Subject: [PATCH 7/8] Fix documentation wrapping. --- api/client.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/api/client.go b/api/client.go index 770d0c5730998..c312cff163ce9 100644 --- a/api/client.go +++ b/api/client.go @@ -135,15 +135,17 @@ type Config struct { // SRVLookup enables the client to lookup the host through DNS SRV lookup SRVLookup bool - // CloneHeaders ensures that the source client's headers are copied to its clone. + // CloneHeaders ensures that the source client's headers are copied to + // its clone. CloneHeaders bool - // PreventStaleReads enables the Client to require discovered cluster replication states - // for each request. + // PreventStaleReads enables the Client to require discovered cluster + // replication states for each request. // The shared state is automatically propagated to all Client clones. // - // Note: Careful consideration should be made prior to enabling this setting since there will - // be a performance penalty paid upon each request. This feature requires Enterprise server-side. + // Note: Careful consideration should be made prior to enabling this setting + // since there will be a performance penalty paid upon each request. + // This feature requires Enterprise server-side. PreventStaleReads bool } From d8c472e1e8f3269620da3ee01c506ffd3b27edbc Mon Sep 17 00:00:00 2001 From: Ben Ash Date: Thu, 14 Oct 2021 13:29:09 -0400 Subject: [PATCH 8/8] Rename PreventStaleReads to ReadYourWrites - replace doc references to stale reads with read-after-write --- api/client.go | 58 ++++++++++++++++++++++----------------------- api/client_test.go | 11 +++++---- changelog/12814.txt | 2 +- 3 files changed, 36 insertions(+), 35 deletions(-) diff --git a/api/client.go b/api/client.go index c312cff163ce9..2d1c3b683f43e 100644 --- a/api/client.go +++ b/api/client.go @@ -139,14 +139,14 @@ type Config struct { // its clone. CloneHeaders bool - // PreventStaleReads enables the Client to require discovered cluster - // replication states for each request. + // ReadYourWrites ensures isolated read-after-write semantics by + // providing discovered cluster replication states in each request. // The shared state is automatically propagated to all Client clones. // // Note: Careful consideration should be made prior to enabling this setting // since there will be a performance penalty paid upon each request. // This feature requires Enterprise server-side. - PreventStaleReads bool + ReadYourWrites bool } // TLSConfig contains the parameters needed to configure TLS on the HTTP client @@ -511,7 +511,7 @@ func NewClient(c *Config) (*Client, error) { headers: make(http.Header), } - if c.PreventStaleReads { + if c.ReadYourWrites { client.replicationStateStore = &replicationStateStore{} } @@ -547,7 +547,7 @@ func (c *Client) CloneConfig() *Config { newConfig.OutputCurlString = c.config.OutputCurlString newConfig.SRVLookup = c.config.SRVLookup newConfig.CloneHeaders = c.config.CloneHeaders - newConfig.PreventStaleReads = c.config.PreventStaleReads + newConfig.ReadYourWrites = c.config.ReadYourWrites // we specifically want a _copy_ of the client here, not a pointer to the original one newClient := *c.config.HttpClient @@ -873,8 +873,8 @@ func (c *Client) CloneHeaders() bool { return c.config.CloneHeaders } -// SetPreventStaleReads to prevent reading stale cluster replication state. -func (c *Client) SetPreventStaleReads(preventStaleReads bool) { +// SetReadYourWrites to prevent reading stale cluster replication state. +func (c *Client) SetReadYourWrites(preventStaleReads bool) { c.modifyLock.Lock() defer c.modifyLock.Unlock() c.config.modifyLock.Lock() @@ -886,17 +886,17 @@ func (c *Client) SetPreventStaleReads(preventStaleReads bool) { c.replicationStateStore = nil } - c.config.PreventStaleReads = preventStaleReads + c.config.ReadYourWrites = preventStaleReads } -// PreventStaleReads gets the configured value of PreventStaleReads -func (c *Client) PreventStaleReads() bool { +// ReadYourWrites gets the configured value of ReadYourWrites +func (c *Client) ReadYourWrites() bool { c.modifyLock.RLock() defer c.modifyLock.RUnlock() c.config.modifyLock.RLock() defer c.config.modifyLock.RUnlock() - return c.config.PreventStaleReads + return c.config.ReadYourWrites } // Clone creates a new client with the same configuration. Note that the same @@ -916,21 +916,21 @@ func (c *Client) Clone() (*Client, error) { defer config.modifyLock.RUnlock() newConfig := &Config{ - Address: config.Address, - HttpClient: config.HttpClient, - MinRetryWait: config.MinRetryWait, - MaxRetryWait: config.MaxRetryWait, - MaxRetries: config.MaxRetries, - Timeout: config.Timeout, - Backoff: config.Backoff, - CheckRetry: config.CheckRetry, - Logger: config.Logger, - Limiter: config.Limiter, - OutputCurlString: config.OutputCurlString, - AgentAddress: config.AgentAddress, - SRVLookup: config.SRVLookup, - CloneHeaders: config.CloneHeaders, - PreventStaleReads: config.PreventStaleReads, + Address: config.Address, + HttpClient: config.HttpClient, + MinRetryWait: config.MinRetryWait, + MaxRetryWait: config.MaxRetryWait, + MaxRetries: config.MaxRetries, + Timeout: config.Timeout, + Backoff: config.Backoff, + CheckRetry: config.CheckRetry, + Logger: config.Logger, + Limiter: config.Limiter, + OutputCurlString: config.OutputCurlString, + AgentAddress: config.AgentAddress, + SRVLookup: config.SRVLookup, + CloneHeaders: config.CloneHeaders, + ReadYourWrites: config.ReadYourWrites, } client, err := NewClient(newConfig) if err != nil { @@ -1048,7 +1048,7 @@ func (c *Client) RawRequestWithContext(ctx context.Context, r *Request) (*Respon cb(r) } - if c.config.PreventStaleReads { + if c.config.ReadYourWrites { c.replicationStateStore.requireState(r) } @@ -1163,7 +1163,7 @@ START: cb(result) } - if c.config.PreventStaleReads { + if c.config.ReadYourWrites { c.replicationStateStore.recordState(result) } } @@ -1357,7 +1357,7 @@ func DefaultRetryPolicy(ctx context.Context, resp *http.Response, err error) (bo } // replicationStateStore is used to track cluster replication states -// in order to prevent stale reads. +// in order to ensure proper read-after-write semantics for a Client. type replicationStateStore struct { m sync.RWMutex store []string diff --git a/api/client_test.go b/api/client_test.go index b5111c677760d..f335a765b3ed3 100644 --- a/api/client_test.go +++ b/api/client_test.go @@ -438,7 +438,7 @@ func TestClone(t *testing.T) { { name: "preventStaleReads", config: &Config{ - PreventStaleReads: true, + ReadYourWrites: true, }, }, } @@ -520,11 +520,12 @@ func TestClone(t *testing.T) { } } } - if tt.config.PreventStaleReads && client1.replicationStateStore == nil { + if tt.config.ReadYourWrites && client1.replicationStateStore == nil { t.Fatalf("replicationStateStore is nil") } if !reflect.DeepEqual(client1.replicationStateStore, client2.replicationStateStore) { - t.Fatalf("expected replicationStateStore %v, actual %v", client1.replicationStateStore, client2.replicationStateStore) + t.Fatalf("expected replicationStateStore %v, actual %v", client1.replicationStateStore, + client2.replicationStateStore) } }) } @@ -841,7 +842,7 @@ func TestReplicationStateStore_requireState(t *testing.T) { } } -func TestClient_PreventDirtyReads(t *testing.T) { +func TestClient_ReadYourWrites(t *testing.T) { b64enc := func(s string) string { return base64.StdEncoding.EncodeToString([]byte(s)) } @@ -949,7 +950,7 @@ func TestClient_PreventDirtyReads(t *testing.T) { config, ln := testHTTPServer(t, handler) defer ln.Close() - config.PreventStaleReads = true + config.ReadYourWrites = true config.Address = fmt.Sprintf("http://%s", ln.Addr()) parent, err := NewClient(config) if err != nil { diff --git a/changelog/12814.txt b/changelog/12814.txt index 15b55631c851e..9d5b541d6d841 100644 --- a/changelog/12814.txt +++ b/changelog/12814.txt @@ -1,3 +1,3 @@ ```release-note:improvement -api: Add support for preventing stale reads by automatically requiring known cluster replication states in each request. +api: Add configuration option for ensuring isolated read-after-write semantics for all Client requests. ```