From 54382d46ccf49b165cfbac699a6b569f56cc207e Mon Sep 17 00:00:00 2001 From: Antoine Tollenaere Date: Fri, 26 Apr 2024 16:01:10 +0200 Subject: [PATCH 1/4] ringhash: allow setting request hash key explicitly Implement part 1 of A76: the ability to set the request hash key, to extract the hash from a header. This allows using ring hash without xDS. --- xds/internal/balancer/ringhash/config.go | 8 +++ xds/internal/balancer/ringhash/config_test.go | 15 ++++ xds/internal/balancer/ringhash/picker.go | 52 ++++++++++---- xds/internal/balancer/ringhash/picker_test.go | 69 ++++++++++++++++--- xds/internal/balancer/ringhash/ringhash.go | 2 +- .../balancer/ringhash/ringhash_test.go | 48 ++++++++++++- xds/internal/balancer/ringhash/util.go | 46 +++++++++---- xds/internal/balancer/ringhash/util_test.go | 65 +++++++++++++++++ xds/internal/resolver/serviceconfig.go | 2 +- xds/internal/resolver/xds_resolver_test.go | 2 +- 10 files changed, 272 insertions(+), 37 deletions(-) create mode 100644 xds/internal/balancer/ringhash/util_test.go diff --git a/xds/internal/balancer/ringhash/config.go b/xds/internal/balancer/ringhash/config.go index b4afcf10013..23368fe97f6 100644 --- a/xds/internal/balancer/ringhash/config.go +++ b/xds/internal/balancer/ringhash/config.go @@ -23,6 +23,7 @@ import ( "fmt" "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/metadata" "google.golang.org/grpc/serviceconfig" ) @@ -32,6 +33,8 @@ type LBConfig struct { MinRingSize uint64 `json:"minRingSize,omitempty"` MaxRingSize uint64 `json:"maxRingSize,omitempty"` + + RequestMetadataKey string `json:"request_metadata_key,omitempty"` } const ( @@ -66,5 +69,10 @@ func parseConfig(c json.RawMessage) (*LBConfig, error) { if cfg.MaxRingSize > envconfig.RingHashCap { cfg.MaxRingSize = envconfig.RingHashCap } + if cfg.RequestMetadataKey != "" { + if err := metadata.ValidatePair(cfg.RequestMetadataKey, ""); err != nil { + return nil, fmt.Errorf("invalid request_metadata_key %q: %w", cfg.RequestMetadataKey, err) + } + } return &cfg, nil } diff --git a/xds/internal/balancer/ringhash/config_test.go b/xds/internal/balancer/ringhash/config_test.go index 1077d3e7daf..107163b827f 100644 --- a/xds/internal/balancer/ringhash/config_test.go +++ b/xds/internal/balancer/ringhash/config_test.go @@ -94,6 +94,21 @@ func (s) TestParseConfig(t *testing.T) { want: nil, wantErr: true, }, + { + name: "request metadata key set", + js: `{"request_metadata_key": "x-foo"}`, + want: &LBConfig{ + MinRingSize: defaultMinSize, + MaxRingSize: defaultMaxSize, + RequestMetadataKey: "x-foo", + }, + }, + { + name: "invalid request metadata keys", + js: `{"request_metadata_key": "!invalid"}`, + want: nil, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index b450716fa0f..7446721333b 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -25,21 +25,24 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/status" ) type picker struct { - ring *ring - logger *grpclog.PrefixLogger - subConnStates map[*subConn]connectivity.State + ring *ring + logger *grpclog.PrefixLogger + subConnStates map[*subConn]connectivity.State + requestHashKey string + randuint64 func() uint64 // overridable for testing } -func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker { +func newPicker(ring *ring, requestHashKey string, logger *grpclog.PrefixLogger) *picker { states := make(map[*subConn]connectivity.State) for _, e := range ring.items { states[e.sc] = e.sc.effectiveState() } - return &picker{ring: ring, logger: logger, subConnStates: states} + return &picker{ring: ring, logger: logger, subConnStates: states, requestHashKey: requestHashKey, randuint64: grpcrand.Uint64} } // handleRICSResult is the return type of handleRICS. It's needed to wrap the @@ -55,16 +58,24 @@ type handleRICSResult struct { // or Shutdown. TransientFailure will be handled specifically after this // function returns. // -// The first return value indicates if the state is in Ready, Idle, Connecting +// The second return value indicates if the state is in Ready, Idle, Connecting // or Shutdown. If it's true, the PickResult and error should be returned from // Pick() as is. -func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) { +func (p *picker) handleRICS(e *ringEntry, usingRandomHash bool) (handleRICSResult, bool) { switch state := p.subConnStates[e.sc]; state { case connectivity.Ready: return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true case connectivity.Idle: // Trigger Connect() and queue the pick. e.sc.queueConnect() + if usingRandomHash { + // "If the use of this random hash triggers a connection attempt + // (...), then before queuing the pick, the picker will scan forward + // searching for a subchannel in `READY` state. If it finds a + // subchannel in `READY` state, the picker returns it." - A76 + p, err := p.returnNextReadySubConn(e) + return handleRICSResult{pr: p, err: err}, true + } return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true case connectivity.Connecting: return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true @@ -84,15 +95,19 @@ func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) { } func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - e := p.ring.pick(getRequestHash(info.Ctx)) - if hr, ok := p.handleRICS(e); ok { + h, usingRandomHash := getRequestHash(info.Ctx, p.requestHashKey) + if usingRandomHash { + h = p.randuint64() + } + e := p.ring.pick(h) + if hr, ok := p.handleRICS(e, usingRandomHash); ok { return hr.pr, hr.err } // ok was false, the entry is in transient failure. - return p.handleTransientFailure(e) + return p.handleTransientFailure(e, usingRandomHash) } -func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, error) { +func (p *picker) handleTransientFailure(e *ringEntry, usingRandomHash bool) (balancer.PickResult, error) { // Queue a connect on the first picked SubConn. e.sc.queueConnect() @@ -105,7 +120,7 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro // For the second SubConn, also check Ready/Idle/Connecting as if it's the // first entry. - if hr, ok := p.handleRICS(e2); ok { + if hr, ok := p.handleRICS(e2, usingRandomHash); ok { return hr.pr, hr.err } @@ -148,6 +163,19 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro return balancer.PickResult{}, fmt.Errorf("no connection is Ready") } +// returnNextReadySubConn returns the first entry after e that has its +// subconn in READY state. If no such entry is found, it returns +// balancer.ErrNoSubConnAvailable. +func (p *picker) returnNextReadySubConn(e *ringEntry) (balancer.PickResult, error) { + for i := range p.ring.items { + e := p.ring.items[(e.idx+i)%len(p.ring.items)] + if e.sc.state == connectivity.Ready { + return balancer.PickResult{SubConn: e.sc.sc}, nil + } + } + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable +} + // nextSkippingDuplicates finds the next entry in the ring, with a different // subconn from the given entry. func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry { diff --git a/xds/internal/balancer/ringhash/picker_test.go b/xds/internal/balancer/ringhash/picker_test.go index f1dbaf2e5ed..a60b5a06bd4 100644 --- a/xds/internal/balancer/ringhash/picker_test.go +++ b/xds/internal/balancer/ringhash/picker_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/grpclog" igrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/metadata" ) var testSubConns []*testutils.TestSubConn @@ -57,7 +58,7 @@ func newTestRing(cStats []connectivity.State) *ring { return &ring{items: items} } -func (s) TestPickerPickFirstTwo(t *testing.T) { +func (s) TestXdsPickerPickFirstTwo(t *testing.T) { tests := []struct { name string ring *ring @@ -107,9 +108,9 @@ func (s) TestPickerPickFirstTwo(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := newPicker(tt.ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) + p := newPicker(tt.ring, "", igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) got, err := p.Pick(balancer.PickInfo{ - Ctx: SetRequestHash(context.Background(), tt.hash), + Ctx: SetXDSRequestHash(context.Background(), tt.hash), }) if err != tt.wantErr { t.Errorf("Pick() error = %v, wantErr %v", err, tt.wantErr) @@ -129,6 +130,56 @@ func (s) TestPickerPickFirstTwo(t *testing.T) { } } +// TestPickerWithRequestHashKey tests that if an explicit request hash key is +// set, it will be used to pick a SubConn. +func (s) TestPickerWithRequestHashKey(t *testing.T) { + tests := []struct { + name string + values []string + ring *ring + wantSC balancer.SubConn + wantErr error + }{ + { + name: "hash key is not set, pick the first ready SubConn", + ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready}), + wantSC: testSubConns[3], + }, + { + name: "hash key is not set, no subchannel ready", + ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Shutdown}), + wantErr: balancer.ErrNoSubConnAvailable, + }, + { + name: "hash key is set to a single value, connect and queue the pick", + values: []string{"test-value"}, // this hashes to the end of the test ring => endpoint 1 expected. + ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready}), + wantErr: balancer.ErrNoSubConnAvailable, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + requestHashKey := "test-key" + ring := tt.ring + p := newPicker(ring, requestHashKey, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) + p.randuint64 = func() uint64 { return 5 } + + md := metadata.New(nil) + md.Set("test-key", tt.values...) + got, err := p.Pick(balancer.PickInfo{ + Ctx: metadata.NewOutgoingContext(context.Background(), md), + }) + if err != tt.wantErr { + t.Errorf("Pick() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got.SubConn != tt.wantSC { + t.Errorf("Pick() got = %v, want picked SubConn: %v", got, tt.wantSC) + } + }) + } +} + // TestPickerPickTriggerTFConnect covers that if the picked SubConn is // TransientFailures, all SubConns until a non-TransientFailure are queued for // Connect(). @@ -137,8 +188,8 @@ func (s) TestPickerPickTriggerTFConnect(t *testing.T) { connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, }) - p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) - _, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)}) + p := newPicker(ring, "", igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) + _, err := p.Pick(balancer.PickInfo{Ctx: SetXDSRequestHash(context.Background(), 5)}) if err == nil { t.Fatalf("Pick() error = %v, want non-nil", err) } @@ -167,8 +218,8 @@ func (s) TestPickerPickTriggerTFReturnReady(t *testing.T) { ring := newTestRing([]connectivity.State{ connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Ready, }) - p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) - pr, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)}) + p := newPicker(ring, "", igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) + pr, err := p.Pick(balancer.PickInfo{Ctx: SetXDSRequestHash(context.Background(), 5)}) if err != nil { t.Fatalf("Pick() error = %v, want nil", err) } @@ -193,8 +244,8 @@ func (s) TestPickerPickTriggerTFWithIdle(t *testing.T) { ring := newTestRing([]connectivity.State{ connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure, }) - p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) - _, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)}) + p := newPicker(ring, "", igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) + _, err := p.Pick(balancer.PickInfo{Ctx: SetXDSRequestHash(context.Background(), 5)}) if err == balancer.ErrNoSubConnAvailable { t.Fatalf("Pick() error = %v, want %v", err, balancer.ErrNoSubConnAvailable) } diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index e63c6f65390..d465b3f7ed3 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -434,7 +434,7 @@ func (b *ringhashBalancer) regeneratePicker() { b.picker = base.NewErrPicker(b.mergeErrors()) return } - b.picker = newPicker(b.ring, b.logger) + b.picker = newPicker(b.ring, b.config.RequestMetadataKey, b.logger) } func (b *ringhashBalancer) Close() { diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index a1edfe5d228..633690bd556 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -63,7 +63,7 @@ func init() { } func ctxWithHash(h uint64) context.Context { - return SetRequestHash(context.Background(), h) + return SetXDSRequestHash(context.Background(), h) } // setupTest creates the balancer, and does an initial sanity check. @@ -480,6 +480,52 @@ func (s) TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { } } +// TestRequestHashKey tests the case where the ringhash balancer receives a +// new picker when the request hash key changes. +func (s) TestRequestHashKeyChanged(t *testing.T) { + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + {Addr: testBackendAddrStrs[2]}, + } + cc, b, p0 := setupTest(t, wantAddrs) + ring0 := p0.(*picker).ring + + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: wantAddrs}, + BalancerConfig: &LBConfig{ + RequestMetadataKey: "test-key", + }, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + var p1 balancer.Picker + select { + case p1 = <-cc.NewPickerCh: + case <-time.After(defaultTestTimeout): + t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses") + } + ring1 := p1.(*picker).ring + if ring1 == ring0 { + t.Fatalf("new picker after changing request hash key has the same ring as before, want different") + } + + // Same config, there be no new picker. + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: wantAddrs}, + BalancerConfig: &LBConfig{ + RequestMetadataKey: "test-key", + }, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + select { + case <-cc.NewPickerCh: + t.Fatalf("unexpected picker after UpdateClientConn with the same addresses") + case <-time.After(defaultTestShortTimeout): + } +} + func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) { tests := []struct { name string diff --git a/xds/internal/balancer/ringhash/util.go b/xds/internal/balancer/ringhash/util.go index 92bb3ae5b79..617e935d850 100644 --- a/xds/internal/balancer/ringhash/util.go +++ b/xds/internal/balancer/ringhash/util.go @@ -18,23 +18,45 @@ package ringhash -import "context" +import ( + "context" + "strings" -type clusterKey struct{} + "github.com/cespare/xxhash/v2" + "google.golang.org/grpc/metadata" +) -func getRequestHash(ctx context.Context) uint64 { - requestHash, _ := ctx.Value(clusterKey{}).(uint64) - return requestHash +type xdsHashKey struct{} + +// getRequestHash returns the request hash to use for this pick, and whether +// a random hash was used. +func getRequestHash(ctx context.Context, requestMetadataKey string) (uint64, bool) { + if requestMetadataKey == "" { + // No explicit request metadata key, use the hash set by the xDS + // resolver. + requestHash, _ := ctx.Value(xdsHashKey{}).(uint64) + return requestHash, false + } + md, _ := metadata.FromOutgoingContext(ctx) + values := md.Get(requestMetadataKey) + if len(values) == 0 || len(values) == 1 && values[0] == "" { + // If the header is not present, generate a random hash. + return 0, true + } + joinedValues := strings.Join(values, ",") + return xxhash.Sum64String(joinedValues), false } -// GetRequestHashForTesting returns the request hash in the context; to be used +// GetXDSRequestHashForTesting returns the request hash in the context; to be used // for testing only. -func GetRequestHashForTesting(ctx context.Context) uint64 { - return getRequestHash(ctx) +func GetXDSRequestHashForTesting(ctx context.Context) uint64 { + // for xDS the random hash is never generated in the picker. + h, _ := getRequestHash(ctx, "") + return h } -// SetRequestHash adds the request hash to the context for use in Ring Hash Load -// Balancing. -func SetRequestHash(ctx context.Context, requestHash uint64) context.Context { - return context.WithValue(ctx, clusterKey{}, requestHash) +// SetXDSRequestHash adds the request hash to the context for use in Ring Hash +// Load Balancing using xDS route hash_policy. +func SetXDSRequestHash(ctx context.Context, requestHash uint64) context.Context { + return context.WithValue(ctx, xdsHashKey{}, requestHash) } diff --git a/xds/internal/balancer/ringhash/util_test.go b/xds/internal/balancer/ringhash/util_test.go new file mode 100644 index 00000000000..5202752a828 --- /dev/null +++ b/xds/internal/balancer/ringhash/util_test.go @@ -0,0 +1,65 @@ +package ringhash + +import ( + "context" + "testing" + + "github.com/cespare/xxhash/v2" + "google.golang.org/grpc/metadata" +) + +func (s) TestGetRequestHash(t *testing.T) { + tests := []struct { + name string + requestMetadataKey string + xdsValue uint64 + explicitValue []string + wantHash uint64 + wantRandom bool + }{ + { + name: "xds hash", + xdsValue: 123, + wantHash: 123, + }, + { + name: "explicit key, no value", + requestMetadataKey: "test-key", + wantRandom: true, + }, + { + name: "explicit key, emtpy value", + requestMetadataKey: "test-key", + explicitValue: []string{""}, + wantRandom: true, + }, + { + name: "explicit key, non empty value", + requestMetadataKey: "test-key", + explicitValue: []string{"test-value"}, + wantHash: xxhash.Sum64String("test-value"), + }, + { + name: "explicit key, multiple values", + requestMetadataKey: "test-key", + explicitValue: []string{"test-value", "test-value-2"}, + wantHash: xxhash.Sum64String("test-value,test-value-2"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + if tt.explicitValue != nil { + ctx = metadata.NewOutgoingContext(context.Background(), metadata.MD{"test-key": tt.explicitValue}) + } + if tt.xdsValue != 0 { + ctx = SetXDSRequestHash(context.Background(), tt.xdsValue) + } + gotHash, gotRandom := getRequestHash(ctx, tt.requestMetadataKey) + + if gotHash != tt.wantHash || gotRandom != tt.wantRandom { + t.Errorf("getRequestHash(%v) = (%v, %v), want (%v, %v)", tt.explicitValue, gotRandom, gotHash, tt.wantRandom, tt.wantHash) + } + }) + } +} diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 88cb1d2a1fd..960e087a15e 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -171,7 +171,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP } lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name) - lbCtx = ringhash.SetRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies)) + lbCtx = ringhash.SetXDSRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies)) config := &iresolver.RPCConfig{ // Communicate to the LB policy the chosen cluster and request hash, if Ring Hash LB policy. diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 1a0ca4ed5b4..1b18da6332b 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -488,7 +488,7 @@ func (s) TestResolverRequestHash(t *testing.T) { if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } - gotHash := ringhash.GetRequestHashForTesting(res.Context) + gotHash := ringhash.GetXDSRequestHashForTesting(res.Context) wantHash := xxhash.Sum64String("/products") if gotHash != wantHash { t.Fatalf("Got request hash: %v, want: %v", gotHash, wantHash) From 957f7e915ee149070be8a4d06fef1766e619a255 Mon Sep 17 00:00:00 2001 From: Antoine Tollenaere Date: Fri, 26 Apr 2024 16:47:23 +0200 Subject: [PATCH 2/4] vet --- xds/internal/balancer/ringhash/util_test.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/xds/internal/balancer/ringhash/util_test.go b/xds/internal/balancer/ringhash/util_test.go index 5202752a828..d42f4d597d8 100644 --- a/xds/internal/balancer/ringhash/util_test.go +++ b/xds/internal/balancer/ringhash/util_test.go @@ -1,3 +1,21 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package ringhash import ( @@ -28,7 +46,7 @@ func (s) TestGetRequestHash(t *testing.T) { wantRandom: true, }, { - name: "explicit key, emtpy value", + name: "explicit key, empty value", requestMetadataKey: "test-key", explicitValue: []string{""}, wantRandom: true, From 665135f00ec2cadb23e353152b55dff7dd64dc85 Mon Sep 17 00:00:00 2001 From: Antoine Tollenaere Date: Mon, 29 Apr 2024 09:40:21 +0200 Subject: [PATCH 3/4] scan forward after first subconn is idle or in TF --- xds/internal/balancer/ringhash/picker.go | 67 ++++++++++++------- xds/internal/balancer/ringhash/picker_test.go | 66 +++++++++++++----- 2 files changed, 92 insertions(+), 41 deletions(-) diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index 7446721333b..6eb593c4704 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -19,7 +19,7 @@ package ringhash import ( - "fmt" + "errors" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" @@ -29,6 +29,11 @@ import ( "google.golang.org/grpc/status" ) +var ( + errNoSubConnReady = errors.New("no connection is Ready") + errSingleSubConnInTransientFailure = errors.New("the only SubConn is in Transient Failure") +) + type picker struct { ring *ring logger *grpclog.PrefixLogger @@ -46,12 +51,13 @@ func newPicker(ring *ring, requestHashKey string, logger *grpclog.PrefixLogger) } // handleRICSResult is the return type of handleRICS. It's needed to wrap the -// returned error from Pick() in a struct. With this, if the return values are -// `balancer.PickResult, error, bool`, linter complains because error is not the -// last return value. +// returned error from Pick() in a struct and whether we triggered a connection +// attempt. Without this, the return values would be `balancer.PickResult, bool, error, bool`, +// and linter would complain because error is not the last return value. type handleRICSResult struct { - pr balancer.PickResult - err error + pr balancer.PickResult + triggeredConnect bool + err error } // handleRICS generates pick result if the entry is in Ready, Idle, Connecting @@ -61,22 +67,14 @@ type handleRICSResult struct { // The second return value indicates if the state is in Ready, Idle, Connecting // or Shutdown. If it's true, the PickResult and error should be returned from // Pick() as is. -func (p *picker) handleRICS(e *ringEntry, usingRandomHash bool) (handleRICSResult, bool) { +func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) { switch state := p.subConnStates[e.sc]; state { case connectivity.Ready: return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true case connectivity.Idle: // Trigger Connect() and queue the pick. e.sc.queueConnect() - if usingRandomHash { - // "If the use of this random hash triggers a connection attempt - // (...), then before queuing the pick, the picker will scan forward - // searching for a subchannel in `READY` state. If it finds a - // subchannel in `READY` state, the picker returns it." - A76 - p, err := p.returnNextReadySubConn(e) - return handleRICSResult{pr: p, err: err}, true - } - return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true + return handleRICSResult{triggeredConnect: true, err: balancer.ErrNoSubConnAvailable}, true case connectivity.Connecting: return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true case connectivity.TransientFailure: @@ -100,7 +98,16 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { h = p.randuint64() } e := p.ring.pick(h) - if hr, ok := p.handleRICS(e, usingRandomHash); ok { + if hr, ok := p.handleRICS(e); ok { + if usingRandomHash && hr.triggeredConnect { + // "If the use of this random hash triggers a connection attempt + // (...), then before queuing the pick, the picker will scan forward + // searching for a subchannel in `READY` state. If it finds a + // subchannel in `READY` state, the picker returns it." - A76 + if p := p.nextReadySubConn(e); p != nil { + return balancer.PickResult{SubConn: p}, nil + } + } return hr.pr, hr.err } // ok was false, the entry is in transient failure. @@ -110,17 +117,26 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { func (p *picker) handleTransientFailure(e *ringEntry, usingRandomHash bool) (balancer.PickResult, error) { // Queue a connect on the first picked SubConn. e.sc.queueConnect() + if usingRandomHash { + // "If the use of this random hash triggers a connection attempt + // (...), then before queuing the pick, the picker will scan forward + // searching for a subchannel in `READY` state. If it finds a + // subchannel in `READY` state, the picker returns it." - A76 + if p := p.nextReadySubConn(e); p != nil { + return balancer.PickResult{SubConn: p}, nil + } + } // Find next entry in the ring, skipping duplicate SubConns. e2 := nextSkippingDuplicates(p.ring, e) if e2 == nil { // There's no next entry available, fail the pick. - return balancer.PickResult{}, fmt.Errorf("the only SubConn is in Transient Failure") + return balancer.PickResult{}, errSingleSubConnInTransientFailure } // For the second SubConn, also check Ready/Idle/Connecting as if it's the // first entry. - if hr, ok := p.handleRICS(e2, usingRandomHash); ok { + if hr, ok := p.handleRICS(e2); ok { return hr.pr, hr.err } @@ -160,20 +176,19 @@ func (p *picker) handleTransientFailure(e *ringEntry, usingRandomHash bool) (bal ee.sc.queueConnect() } } - return balancer.PickResult{}, fmt.Errorf("no connection is Ready") + return balancer.PickResult{}, errNoSubConnReady } -// returnNextReadySubConn returns the first entry after e that has its -// subconn in READY state. If no such entry is found, it returns -// balancer.ErrNoSubConnAvailable. -func (p *picker) returnNextReadySubConn(e *ringEntry) (balancer.PickResult, error) { +// nextReadySubConn returns the first entry after e that has its +// subconn in READY state. If no such entry is found, a PickResult with a +func (p *picker) nextReadySubConn(e *ringEntry) balancer.SubConn { for i := range p.ring.items { e := p.ring.items[(e.idx+i)%len(p.ring.items)] if e.sc.state == connectivity.Ready { - return balancer.PickResult{SubConn: e.sc.sc}, nil + return e.sc.sc } } - return balancer.PickResult{}, balancer.ErrNoSubConnAvailable + return nil } // nextSkippingDuplicates finds the next entry in the ring, with a different diff --git a/xds/internal/balancer/ringhash/picker_test.go b/xds/internal/balancer/ringhash/picker_test.go index a60b5a06bd4..b6bd3d6fd1e 100644 --- a/xds/internal/balancer/ringhash/picker_test.go +++ b/xds/internal/balancer/ringhash/picker_test.go @@ -105,6 +105,12 @@ func (s) TestXdsPickerPickFirstTwo(t *testing.T) { wantErr: balancer.ErrNoSubConnAvailable, wantSCToConnect: testSubConns[1], }, + { + name: "single channel in TransientFailure", + ring: newTestRing([]connectivity.State{connectivity.TransientFailure}), + hash: 5, + wantErr: errSingleSubConnInTransientFailure, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -134,27 +140,47 @@ func (s) TestXdsPickerPickFirstTwo(t *testing.T) { // set, it will be used to pick a SubConn. func (s) TestPickerWithRequestHashKey(t *testing.T) { tests := []struct { - name string - values []string - ring *ring - wantSC balancer.SubConn - wantErr error + name string + values []string + ring *ring + wantSC balancer.SubConn + wantSCToConnect balancer.SubConn + wantErr error }{ { - name: "hash key is not set, pick the first ready SubConn", - ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready}), + name: "hash key is not set, random picked an idle subconn, pick first ready SubConn", + ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready, connectivity.Ready}), + wantSC: testSubConns[3], + wantSCToConnect: testSubConns[0], + }, + { + name: "hash key is not set, random picks an idle subconn, no subconn ready", + ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Shutdown}), + wantErr: balancer.ErrNoSubConnAvailable, + wantSCToConnect: testSubConns[0], + }, + { + name: "hash key is not set, random picks a subconn in transient failure, pick first subchannel ready", + ring: newTestRing([]connectivity.State{connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready, connectivity.Ready}), wantSC: testSubConns[3], }, { - name: "hash key is not set, no subchannel ready", - ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Shutdown}), - wantErr: balancer.ErrNoSubConnAvailable, + name: "hash key is not set, random picked a subconn in transient failure, no subchannel ready", + ring: newTestRing([]connectivity.State{connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Connecting}), + wantErr: errNoSubConnReady, }, { - name: "hash key is set to a single value, connect and queue the pick", - values: []string{"test-value"}, // this hashes to the end of the test ring => endpoint 1 expected. - ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready}), - wantErr: balancer.ErrNoSubConnAvailable, + name: "hash key is set, return ready subconn", + values: []string{"test-value"}, // this hashes to the end of the test ring => ring entry 0 expected. + ring: newTestRing([]connectivity.State{connectivity.Ready, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready}), + wantSC: testSubConns[0], + }, + { + name: "hash key is set, connect and queue the pick", + values: []string{"test-value"}, // this hashes to the end of the test ring => ring entry 0 expected. + ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready}), + wantErr: balancer.ErrNoSubConnAvailable, + wantSCToConnect: testSubConns[0], }, } for _, tt := range tests { @@ -162,7 +188,10 @@ func (s) TestPickerWithRequestHashKey(t *testing.T) { requestHashKey := "test-key" ring := tt.ring p := newPicker(ring, requestHashKey, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) - p.randuint64 = func() uint64 { return 5 } + p.randuint64 = func() uint64 { + // always return the first entry on the ring. + return 5 + } md := metadata.New(nil) md.Set("test-key", tt.values...) @@ -176,6 +205,13 @@ func (s) TestPickerWithRequestHashKey(t *testing.T) { if got.SubConn != tt.wantSC { t.Errorf("Pick() got = %v, want picked SubConn: %v", got, tt.wantSC) } + if sc := tt.wantSCToConnect; sc != nil { + select { + case <-sc.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestShortTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc) + } + } }) } } From 852363bcaacf71fa013e75ef6d6d1acff6fe841d Mon Sep 17 00:00:00 2001 From: Antoine Tollenaere Date: Wed, 22 May 2024 11:45:15 +0200 Subject: [PATCH 4/4] comments from arvind --- internal/metadata/metadata.go | 30 ++++--- xds/internal/balancer/ringhash/config.go | 9 +- xds/internal/balancer/ringhash/config_test.go | 6 ++ xds/internal/balancer/ringhash/picker.go | 52 ++++++++---- xds/internal/balancer/ringhash/picker_test.go | 62 ++++++++++++++ xds/internal/balancer/ringhash/util.go | 34 ++------ xds/internal/balancer/ringhash/util_test.go | 83 ------------------- xds/internal/resolver/xds_resolver_test.go | 2 +- 8 files changed, 136 insertions(+), 142 deletions(-) delete mode 100644 xds/internal/balancer/ringhash/util_test.go diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go index 900bfb71608..8f5f0d229c9 100644 --- a/internal/metadata/metadata.go +++ b/internal/metadata/metadata.go @@ -97,13 +97,30 @@ func hasNotPrintable(msg string) bool { return false } -// ValidatePair validate a key-value pair with the following rules (the pseudo-header will be skipped) : +// ValidatePair validates a key-value pair with the following rules (the pseudo-header will be skipped) : // // - key must contain one or more characters. // - the characters in the key must be contained in [0-9 a-z _ - .]. // - if the key ends with a "-bin" suffix, no validation of the corresponding value is performed. -// - the characters in the every value must be printable (in [%x20-%x7E]). +// - the characters in every value must be printable (in [%x20-%x7E]). func ValidatePair(key string, vals ...string) error { + if err := ValidateKey(key); err != nil { + return err + } + if strings.HasSuffix(key, "-bin") { + return nil + } + // check value + for _, val := range vals { + if hasNotPrintable(val) { + return fmt.Errorf("header key %q contains value with non-printable ASCII characters", key) + } + } + return nil +} + +// ValidateKey validates +func ValidateKey(key string) error { // key should not be empty if key == "" { return fmt.Errorf("there is an empty key in the header") @@ -119,14 +136,5 @@ func ValidatePair(key string, vals ...string) error { return fmt.Errorf("header key %q contains illegal characters not in [0-9a-z-_.]", key) } } - if strings.HasSuffix(key, "-bin") { - return nil - } - // check value - for _, val := range vals { - if hasNotPrintable(val) { - return fmt.Errorf("header key %q contains value with non-printable ASCII characters", key) - } - } return nil } diff --git a/xds/internal/balancer/ringhash/config.go b/xds/internal/balancer/ringhash/config.go index 23368fe97f6..dc05a6c9677 100644 --- a/xds/internal/balancer/ringhash/config.go +++ b/xds/internal/balancer/ringhash/config.go @@ -21,6 +21,7 @@ package ringhash import ( "encoding/json" "fmt" + "strings" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/metadata" @@ -70,8 +71,12 @@ func parseConfig(c json.RawMessage) (*LBConfig, error) { cfg.MaxRingSize = envconfig.RingHashCap } if cfg.RequestMetadataKey != "" { - if err := metadata.ValidatePair(cfg.RequestMetadataKey, ""); err != nil { - return nil, fmt.Errorf("invalid request_metadata_key %q: %w", cfg.RequestMetadataKey, err) + // See rules in https://github.com/grpc/proposal/blob/54074388ca49e7c8eb1060af238ce98a63ad9daa/A76-ring-hash-improvements.md#explicitly-setting-the-request-hash-key + if err := metadata.ValidateKey(cfg.RequestMetadataKey); err != nil { + return nil, fmt.Errorf("invalid request_metadata_key %q: %s", cfg.RequestMetadataKey, err) + } + if strings.HasSuffix(cfg.RequestMetadataKey, "-bin") { + return nil, fmt.Errorf("invalid request_metadata_key %q: key must not end with \"-bin\"", cfg.RequestMetadataKey) } } return &cfg, nil diff --git a/xds/internal/balancer/ringhash/config_test.go b/xds/internal/balancer/ringhash/config_test.go index 107163b827f..b37384f7b8c 100644 --- a/xds/internal/balancer/ringhash/config_test.go +++ b/xds/internal/balancer/ringhash/config_test.go @@ -109,6 +109,12 @@ func (s) TestParseConfig(t *testing.T) { want: nil, wantErr: true, }, + { + name: "binary request metadata keys", + js: `{"request_metadata_key": "header-with-bin"}`, + want: nil, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index 6eb593c4704..2511c1c72de 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -19,13 +19,17 @@ package ringhash import ( + "context" "errors" + "strings" + "github.com/cespare/xxhash/v2" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcrand" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -52,8 +56,9 @@ func newPicker(ring *ring, requestHashKey string, logger *grpclog.PrefixLogger) // handleRICSResult is the return type of handleRICS. It's needed to wrap the // returned error from Pick() in a struct and whether we triggered a connection -// attempt. Without this, the return values would be `balancer.PickResult, bool, error, bool`, -// and linter would complain because error is not the last return value. +// attempt. Without this, the return values would be `balancer.PickResult, bool, +// error, bool`, and linter would complain because error is not the last return +// value. type handleRICSResult struct { pr balancer.PickResult triggeredConnect bool @@ -93,13 +98,10 @@ func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) { } func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - h, usingRandomHash := getRequestHash(info.Ctx, p.requestHashKey) - if usingRandomHash { - h = p.randuint64() - } + h, usesRandomHash := p.getRequestHash(info.Ctx) e := p.ring.pick(h) if hr, ok := p.handleRICS(e); ok { - if usingRandomHash && hr.triggeredConnect { + if usesRandomHash && hr.triggeredConnect { // "If the use of this random hash triggers a connection attempt // (...), then before queuing the pick, the picker will scan forward // searching for a subchannel in `READY` state. If it finds a @@ -111,13 +113,32 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { return hr.pr, hr.err } // ok was false, the entry is in transient failure. - return p.handleTransientFailure(e, usingRandomHash) + return p.handleTransientFailure(e, usesRandomHash) +} + +// getRequestHash returns the request hash to use for this pick, and whether +// a random hash was used. +func (p *picker) getRequestHash(ctx context.Context) (uint64, bool) { + if p.requestHashKey == "" { + // No explicit request metadata key, use the hash set by the xDS + // resolver. Note that for xDS, the random hash is never generated + // in the picker. + return GetXDSRequestHash(ctx), false + } + md, _ := metadata.FromOutgoingContext(ctx) + values := md.Get(p.requestHashKey) + if len(values) == 0 || len(values) == 1 && values[0] == "" { + // If the header is not present, generate a random hash. + return p.randuint64(), true + } + joinedValues := strings.Join(values, ",") + return xxhash.Sum64String(joinedValues), false } -func (p *picker) handleTransientFailure(e *ringEntry, usingRandomHash bool) (balancer.PickResult, error) { +func (p *picker) handleTransientFailure(e *ringEntry, usesRandomHash bool) (balancer.PickResult, error) { // Queue a connect on the first picked SubConn. e.sc.queueConnect() - if usingRandomHash { + if usesRandomHash { // "If the use of this random hash triggers a connection attempt // (...), then before queuing the pick, the picker will scan forward // searching for a subchannel in `READY` state. If it finds a @@ -179,13 +200,12 @@ func (p *picker) handleTransientFailure(e *ringEntry, usingRandomHash bool) (bal return balancer.PickResult{}, errNoSubConnReady } -// nextReadySubConn returns the first entry after e that has its -// subconn in READY state. If no such entry is found, a PickResult with a +// nextReadySubConn returns the first entry after e that has its subconn in +// READY state. If no such entry is found, it returns nil. func (p *picker) nextReadySubConn(e *ringEntry) balancer.SubConn { - for i := range p.ring.items { - e := p.ring.items[(e.idx+i)%len(p.ring.items)] - if e.sc.state == connectivity.Ready { - return e.sc.sc + for next := p.ring.next(e); next != e; next = p.ring.next(next) { + if next.sc.state == connectivity.Ready { + return next.sc.sc } } return nil diff --git a/xds/internal/balancer/ringhash/picker_test.go b/xds/internal/balancer/ringhash/picker_test.go index 48abbde4c69..72afd9df24f 100644 --- a/xds/internal/balancer/ringhash/picker_test.go +++ b/xds/internal/balancer/ringhash/picker_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/cespare/xxhash/v2" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" @@ -381,3 +382,64 @@ func (s) TestNextSkippingDuplicatesOnlyDup(t *testing.T) { t.Errorf("nextSkippingDuplicates() = %v, want nil", got) } } +func (s) TestGetRequestHash(t *testing.T) { + tests := []struct { + name string + requestMetadataKey string + xdsValue uint64 + explicitValue []string + wantHash uint64 + wantRandom bool + }{ + { + name: "xds hash", + xdsValue: 123, + wantHash: 123, + }, + { + name: "explicit key, no value", + requestMetadataKey: "test-key", + wantHash: 42, + wantRandom: true, + }, + { + name: "explicit key, empty value", + requestMetadataKey: "test-key", + explicitValue: []string{""}, + wantHash: 42, + wantRandom: true, + }, + { + name: "explicit key, non empty value", + requestMetadataKey: "test-key", + explicitValue: []string{"test-value"}, + wantHash: xxhash.Sum64String("test-value"), + }, + { + name: "explicit key, multiple values", + requestMetadataKey: "test-key", + explicitValue: []string{"test-value", "test-value-2"}, + wantHash: xxhash.Sum64String("test-value,test-value-2"), + }, + } + testRing := newTestRing([]connectivity.State{connectivity.Idle}) + for _, tt := range tests { + p := newPicker(testRing, tt.requestMetadataKey, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) + p.randuint64 = func() uint64 { return 42 } + + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + if tt.explicitValue != nil { + ctx = metadata.NewOutgoingContext(context.Background(), metadata.MD{"test-key": tt.explicitValue}) + } + if tt.xdsValue != 0 { + ctx = SetXDSRequestHash(context.Background(), tt.xdsValue) + } + gotHash, gotRandom := p.getRequestHash(ctx) + + if gotHash != tt.wantHash || gotRandom != tt.wantRandom { + t.Errorf("getRequestHash(%v) = (%v, %v), want (%v, %v)", tt.explicitValue, gotRandom, gotHash, tt.wantRandom, tt.wantHash) + } + }) + } +} diff --git a/xds/internal/balancer/ringhash/util.go b/xds/internal/balancer/ringhash/util.go index 617e935d850..53453be6404 100644 --- a/xds/internal/balancer/ringhash/util.go +++ b/xds/internal/balancer/ringhash/util.go @@ -20,39 +20,15 @@ package ringhash import ( "context" - "strings" - - "github.com/cespare/xxhash/v2" - "google.golang.org/grpc/metadata" ) type xdsHashKey struct{} -// getRequestHash returns the request hash to use for this pick, and whether -// a random hash was used. -func getRequestHash(ctx context.Context, requestMetadataKey string) (uint64, bool) { - if requestMetadataKey == "" { - // No explicit request metadata key, use the hash set by the xDS - // resolver. - requestHash, _ := ctx.Value(xdsHashKey{}).(uint64) - return requestHash, false - } - md, _ := metadata.FromOutgoingContext(ctx) - values := md.Get(requestMetadataKey) - if len(values) == 0 || len(values) == 1 && values[0] == "" { - // If the header is not present, generate a random hash. - return 0, true - } - joinedValues := strings.Join(values, ",") - return xxhash.Sum64String(joinedValues), false -} - -// GetXDSRequestHashForTesting returns the request hash in the context; to be used -// for testing only. -func GetXDSRequestHashForTesting(ctx context.Context) uint64 { - // for xDS the random hash is never generated in the picker. - h, _ := getRequestHash(ctx, "") - return h +// GetXDSRequestHash returns the request hash in the context, set from the +// xDS config selector. +func GetXDSRequestHash(ctx context.Context) uint64 { + requestHash, _ := ctx.Value(xdsHashKey{}).(uint64) + return requestHash } // SetXDSRequestHash adds the request hash to the context for use in Ring Hash diff --git a/xds/internal/balancer/ringhash/util_test.go b/xds/internal/balancer/ringhash/util_test.go deleted file mode 100644 index d42f4d597d8..00000000000 --- a/xds/internal/balancer/ringhash/util_test.go +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright 2024 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package ringhash - -import ( - "context" - "testing" - - "github.com/cespare/xxhash/v2" - "google.golang.org/grpc/metadata" -) - -func (s) TestGetRequestHash(t *testing.T) { - tests := []struct { - name string - requestMetadataKey string - xdsValue uint64 - explicitValue []string - wantHash uint64 - wantRandom bool - }{ - { - name: "xds hash", - xdsValue: 123, - wantHash: 123, - }, - { - name: "explicit key, no value", - requestMetadataKey: "test-key", - wantRandom: true, - }, - { - name: "explicit key, empty value", - requestMetadataKey: "test-key", - explicitValue: []string{""}, - wantRandom: true, - }, - { - name: "explicit key, non empty value", - requestMetadataKey: "test-key", - explicitValue: []string{"test-value"}, - wantHash: xxhash.Sum64String("test-value"), - }, - { - name: "explicit key, multiple values", - requestMetadataKey: "test-key", - explicitValue: []string{"test-value", "test-value-2"}, - wantHash: xxhash.Sum64String("test-value,test-value-2"), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - if tt.explicitValue != nil { - ctx = metadata.NewOutgoingContext(context.Background(), metadata.MD{"test-key": tt.explicitValue}) - } - if tt.xdsValue != 0 { - ctx = SetXDSRequestHash(context.Background(), tt.xdsValue) - } - gotHash, gotRandom := getRequestHash(ctx, tt.requestMetadataKey) - - if gotHash != tt.wantHash || gotRandom != tt.wantRandom { - t.Errorf("getRequestHash(%v) = (%v, %v), want (%v, %v)", tt.explicitValue, gotRandom, gotHash, tt.wantRandom, tt.wantHash) - } - }) - } -} diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 8f328047fce..eedeb5cb597 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -488,7 +488,7 @@ func (s) TestResolverRequestHash(t *testing.T) { if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } - gotHash := ringhash.GetXDSRequestHashForTesting(res.Context) + gotHash := ringhash.GetXDSRequestHash(res.Context) wantHash := xxhash.Sum64String("/products") if gotHash != wantHash { t.Fatalf("Got request hash: %v, want: %v", gotHash, wantHash)