Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ringhash: make endpoint hash key configurable, and set it from EDS #7161

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/envconfig/envconfig.go
Expand Up @@ -46,6 +46,9 @@ var (
// by setting the environment variable "GRPC_ENFORCE_ALPN_ENABLED" to "true"
// or "false".
EnforceALPNEnabled = boolFromEnv("GRPC_ENFORCE_ALPN_ENABLED", false)
// XDSEndpointHashKeyBackwardCompat is set to true if we should not parse
// the xds endpoint hash key from EDS LbEndpoint metadata.
XDSEndpointHashKeyBackwardCompat = boolFromEnv("GRPC_XDS_ENDPOINT_HASH_KEY_BACKWARD_COMPAT", false)
)

func boolFromEnv(envVar string, def bool) bool {
Expand Down
59 changes: 59 additions & 0 deletions resolver/ringhash/attr.go
@@ -0,0 +1,59 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package ringhash implements resolver related functions for the ring_hash
// load balancing policy.
package ringhash

import (
"google.golang.org/grpc/resolver"
)

// hashKeyKey is the key to store the ring hash key attribute in
// resolver.Address attribute.
const hashKeyKey = hashKeyType("hash_key")

type hashKeyType string

// SetAddrHashKey sets the hash key for this address. Combined with the
// ring_hash load balancing policy, it allows placing the address on the ring
// based on an arbitrary string instead of the IP address.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SetAddrHashKey(addr resolver.Address, hashKey string) resolver.Address {
if hashKey == "" {
return addr
}
addr.BalancerAttributes = addr.BalancerAttributes.WithValue(hashKeyKey, hashKey)
return addr
}

// GetAddrHashKey returns the ring hash key attribute of addr. If this attribute
// is not set, it returns an empty string.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func GetAddrHashKey(addr resolver.Address) string {
hashKey, _ := addr.BalancerAttributes.Value(hashKeyKey).(string)
return hashKey
}
2 changes: 2 additions & 0 deletions xds/internal/balancer/clusterresolver/configbuilder.go
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/internal/hierarchy"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/ringhash"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
Expand Down Expand Up @@ -276,6 +277,7 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority
ew = endpoint.Weight
}
addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew})
addr = ringhash.SetAddrHashKey(addr, endpoint.HashKey)
addrs = append(addrs, addr)
}
}
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/ringhash/picker_test.go
Expand Up @@ -48,9 +48,9 @@ func newTestRing(cStats []connectivity.State) *ring {
idx: i,
hash: uint64((i + 1) * 10),
sc: &subConn{
addr: testSC.String(),
sc: testSC,
state: st,
hashKey: testSC.String(),
sc: testSC,
state: st,
},
})
}
Expand Down
8 changes: 4 additions & 4 deletions xds/internal/balancer/ringhash/ring.go
Expand Up @@ -23,7 +23,7 @@ import (
"sort"
"strconv"

xxhash "github.com/cespare/xxhash/v2"
"github.com/cespare/xxhash/v2"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/resolver"
)
Expand Down Expand Up @@ -101,7 +101,7 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64, log
// updates.
idx := 0
for currentHashes < targetHashes {
h := xxhash.Sum64String(scw.sc.addr + "_" + strconv.Itoa(idx))
h := xxhash.Sum64String(scw.sc.hashKey + "_" + strconv.Itoa(idx))
items = append(items, &ringEntry{hash: h, sc: scw.sc})
idx++
currentHashes++
Expand Down Expand Up @@ -141,13 +141,13 @@ func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float
min = nw
}
}
// Sort the addresses to return consistent results.
// Sort the hash keys to return consistent results.
//
// Note: this might not be necessary, but this makes sure the ring is
// consistent as long as the addresses are the same, for example, in cases
// where an address is added and then removed, the RPCs will still pick the
// same old SubConn.
sort.Slice(ret, func(i, j int) bool { return ret[i].sc.addr < ret[j].sc.addr })
sort.Slice(ret, func(i, j int) bool { return ret[i].sc.hashKey < ret[j].sc.hashKey })
return ret, min
}

Expand Down
11 changes: 6 additions & 5 deletions xds/internal/balancer/ringhash/ring_test.go
Expand Up @@ -23,7 +23,7 @@ import (
"math"
"testing"

xxhash "github.com/cespare/xxhash/v2"
"github.com/cespare/xxhash/v2"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/resolver"
)
Expand All @@ -38,9 +38,9 @@ func init() {
testAddr("c", 4),
}
testSubConnMap = resolver.NewAddressMap()
testSubConnMap.Set(testAddrs[0], &subConn{addr: "a"})
testSubConnMap.Set(testAddrs[1], &subConn{addr: "b"})
testSubConnMap.Set(testAddrs[2], &subConn{addr: "c"})
testSubConnMap.Set(testAddrs[0], &subConn{hashKey: "a"})
testSubConnMap.Set(testAddrs[1], &subConn{hashKey: "b"})
testSubConnMap.Set(testAddrs[2], &subConn{hashKey: "c"})
}

func testAddr(addr string, weight uint32) resolver.Address {
Expand All @@ -60,7 +60,8 @@ func (s) TestRingNew(t *testing.T) {
for _, a := range testAddrs {
var count int
for _, ii := range r.items {
if ii.sc.addr == a.Addr {
// In those tests the hash key is the default, which is the address.
if ii.sc.hashKey == a.Addr {
count++
}
}
Expand Down
35 changes: 24 additions & 11 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/ringhash"
"google.golang.org/grpc/serviceconfig"
)

Expand Down Expand Up @@ -65,10 +66,10 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
}

type subConn struct {
addr string
weight uint32
sc balancer.SubConn
logger *grpclog.PrefixLogger
hashKey string
weight uint32
sc balancer.SubConn
logger *grpclog.PrefixLogger

mu sync.RWMutex
// This is the actual state of this SubConn (as updated by the ClientConn).
Expand Down Expand Up @@ -207,6 +208,7 @@ type ringhashBalancer struct {
// - an address was added
// - an address was removed
// - an address's weight was updated
// - an address's hash key was updated
//
// Note that this function doesn't trigger SubConn connecting, so all the new
// SubConn states are Idle.
Expand All @@ -217,6 +219,10 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
for _, addr := range addrs {
addrsSet.Set(addr, true)
newWeight := getWeightAttribute(addr)
newHashKey := ringhash.GetAddrHashKey(addr)
if newHashKey == "" {
newHashKey = addr.Addr
}
if val, ok := b.subConns.Get(addr); !ok {
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
Expand All @@ -228,28 +234,35 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
b.logger.Warningf("Failed to create new SubConn: %v", err)
continue
}
scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc}
scs := &subConn{hashKey: newHashKey, weight: newWeight, sc: sc}
scs.logger = subConnPrefixLogger(b, scs)
scs.setState(connectivity.Idle)
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
b.subConns.Set(addr, scs)
b.scStates[sc] = scs
addrsUpdated = true
} else {
// We have seen this address before and created a subConn for it. If the
// weight associated with the address has changed, update the subConns map
// with the new weight. This will be used when a new ring is created.
// We have seen this address before and created a subConn for it. If
// the weight or the hash key associated with the address has
// changed, update the subConns map with the new weight and/or hash
// key. This will be used when a new ring is created.
//
// There is no need to call UpdateAddresses on the subConn at this point
// since *only* the weight attribute has changed, and that does not affect
// subConn uniqueness.
// There is no need to call UpdateAddresses on the subConn at this
// point since *only* the weight and/or hash key attribute has
// changed, and that does not affect subConn uniqueness.
scInfo := val.(*subConn)
if oldWeight := scInfo.weight; oldWeight != newWeight {
scInfo.weight = newWeight
b.subConns.Set(addr, scInfo)
// Return true to force recreation of the ring.
addrsUpdated = true
}
if oldHashKey := scInfo.hashKey; oldHashKey != newHashKey {
scInfo.hashKey = newHashKey
b.subConns.Set(addr, scInfo)
// Return true to force recreation of the ring.
addrsUpdated = true
}
}
}
for _, addr := range b.subConns.Keys() {
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/ringhash/ringhash_test.go
Expand Up @@ -33,7 +33,7 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/resolver/ringhash"
)

var (
Expand Down Expand Up @@ -538,10 +538,10 @@ func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) {
// the pointer is different. This test verifies that subConns are not recreated
// in this scenario.
func (s) TestAddrBalancerAttributesChange(t *testing.T) {
addrs1 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
addrs1 := []resolver.Address{ringhash.SetAddrHashKey(resolver.Address{Addr: testBackendAddrStrs[0]}, "test_key")}
cc, b, _ := setupTest(t, addrs1)

addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
addrs2 := []resolver.Address{ringhash.SetAddrHashKey(resolver.Address{Addr: testBackendAddrStrs[0]}, "test_key")}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs2},
BalancerConfig: testConfig,
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/resolver/serviceconfig.go
Expand Up @@ -27,7 +27,7 @@ import (
"sync/atomic"
"time"

xxhash "github.com/cespare/xxhash/v2"
"github.com/cespare/xxhash/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcutil"
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/resolver/serviceconfig_test.go
Expand Up @@ -23,7 +23,7 @@ import (
"regexp"
"testing"

xxhash "github.com/cespare/xxhash/v2"
"github.com/cespare/xxhash/v2"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/grpcutil"
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/resolver/xds_resolver_test.go
Expand Up @@ -25,7 +25,7 @@ import (
"testing"
"time"

xxhash "github.com/cespare/xxhash/v2"
"github.com/cespare/xxhash/v2"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
Expand Down
1 change: 1 addition & 0 deletions xds/internal/xdsclient/xdsresource/type_eds.go
Expand Up @@ -52,6 +52,7 @@ type Endpoint struct {
Address string
HealthStatus EndpointHealthStatus
Weight uint32
HashKey string
}

// Locality contains information of a locality.
Expand Down
23 changes: 23 additions & 0 deletions xds/internal/xdsclient/xdsresource/unmarshal_eds.go
Expand Up @@ -26,6 +26,7 @@ import (
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/xds/internal"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -98,15 +99,37 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs
return nil, fmt.Errorf("duplicate endpoint with the same address %s", addr)
}
uniqueEndpointAddrs[addr] = true

hashKey := getHashKey(lbEndpoint)
endpoints = append(endpoints, Endpoint{
HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
Address: addr,
Weight: weight,
HashKey: hashKey,
})
}
return endpoints, nil
}

// getHashKey extracts and returns the hash key from the given LbEndpoint. If no
// hash key is found, it returns an empty string.
func getHashKey(lbEndpoint *v3endpointpb.LbEndpoint) string {
// "The xDS resolver, described in A74, will be changed to set the hash_key
// endpoint attribute to the value of LbEndpoint.Metadata envoy.lb hash_key
// field, as described in Envoy's documentation for the ring hash load
// balancer." - A76
if envconfig.XDSEndpointHashKeyBackwardCompat {
return ""
}
envoyLB := lbEndpoint.GetMetadata().GetFilterMetadata()["envoy.lb"]
if envoyLB != nil {
if h := envoyLB.GetFields()["hash_key"]; h != nil {
return h.GetStringValue()
}
}
return ""
}

func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, error) {
ret := EndpointsUpdate{}
for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
Expand Down