Skip to content

Commit

Permalink
ringhash: make endpoint hash key configurable, and set it from EDS
Browse files Browse the repository at this point in the history
Implement part 2 of A76: the ability to specify the location of endpoints on the
ring, and a default implementation based on EDS metadata that matches Envoy
behavior.
  • Loading branch information
atollena committed Apr 23, 2024
1 parent 34c7675 commit e1bf6bf
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 29 deletions.
59 changes: 59 additions & 0 deletions resolver/ringhash/attr.go
@@ -0,0 +1,59 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package ringhash implements resolver related functions for the ring_hash
// load balancing policy.
package ringhash

import (
"google.golang.org/grpc/resolver"
)

// hashKeyKey is the key to store the ring hash key attribute in
// resolver.Address attribute.
const hashKeyKey = hashKeyType("hash_key")

type hashKeyType string

// SetAddrHashKey sets the hash key for this address. Combined with the
// ring_hash load balancing policy, it allows placing the address on the ring
// based on an arbitrary string instead of the IP address.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SetAddrHashKey(addr resolver.Address, hashKey string) resolver.Address {
if hashKey == "" {
return addr
}
addr.BalancerAttributes = addr.BalancerAttributes.WithValue(hashKeyKey, hashKey)
return addr
}

// GetAddrHashKey returns the ring hash key attribute of addr. If this attribute
// is not set, it returns an empty string.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func GetAddrHashKey(addr resolver.Address) string {
hashKey, _ := addr.BalancerAttributes.Value(hashKeyKey).(string)
return hashKey
}
2 changes: 2 additions & 0 deletions xds/internal/balancer/clusterresolver/configbuilder.go
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/internal/hierarchy"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/ringhash"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
Expand Down Expand Up @@ -276,6 +277,7 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority
ew = endpoint.Weight
}
addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew})
addr = ringhash.SetAddrHashKey(addr, endpoint.HashKey)
addrs = append(addrs, addr)
}
}
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/ringhash/picker_test.go
Expand Up @@ -48,9 +48,9 @@ func newTestRing(cStats []connectivity.State) *ring {
idx: i,
hash: uint64((i + 1) * 10),
sc: &subConn{
addr: testSC.String(),
sc: testSC,
state: st,
hashKey: testSC.String(),
sc: testSC,
state: st,
},
})
}
Expand Down
8 changes: 4 additions & 4 deletions xds/internal/balancer/ringhash/ring.go
Expand Up @@ -23,7 +23,7 @@ import (
"sort"
"strconv"

xxhash "github.com/cespare/xxhash/v2"
"github.com/cespare/xxhash/v2"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/resolver"
)
Expand Down Expand Up @@ -101,7 +101,7 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64, log
// updates.
idx := 0
for currentHashes < targetHashes {
h := xxhash.Sum64String(scw.sc.addr + "_" + strconv.Itoa(idx))
h := xxhash.Sum64String(scw.sc.hashKey + "_" + strconv.Itoa(idx))
items = append(items, &ringEntry{hash: h, sc: scw.sc})
idx++
currentHashes++
Expand Down Expand Up @@ -141,13 +141,13 @@ func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float
min = nw
}
}
// Sort the addresses to return consistent results.
// Sort the hash keys to return consistent results.
//
// Note: this might not be necessary, but this makes sure the ring is
// consistent as long as the addresses are the same, for example, in cases
// where an address is added and then removed, the RPCs will still pick the
// same old SubConn.
sort.Slice(ret, func(i, j int) bool { return ret[i].sc.addr < ret[j].sc.addr })
sort.Slice(ret, func(i, j int) bool { return ret[i].sc.hashKey < ret[j].sc.hashKey })
return ret, min
}

Expand Down
11 changes: 6 additions & 5 deletions xds/internal/balancer/ringhash/ring_test.go
Expand Up @@ -23,7 +23,7 @@ import (
"math"
"testing"

xxhash "github.com/cespare/xxhash/v2"
"github.com/cespare/xxhash/v2"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/resolver"
)
Expand All @@ -38,9 +38,9 @@ func init() {
testAddr("c", 4),
}
testSubConnMap = resolver.NewAddressMap()
testSubConnMap.Set(testAddrs[0], &subConn{addr: "a"})
testSubConnMap.Set(testAddrs[1], &subConn{addr: "b"})
testSubConnMap.Set(testAddrs[2], &subConn{addr: "c"})
testSubConnMap.Set(testAddrs[0], &subConn{hashKey: "a"})
testSubConnMap.Set(testAddrs[1], &subConn{hashKey: "b"})
testSubConnMap.Set(testAddrs[2], &subConn{hashKey: "c"})
}

func testAddr(addr string, weight uint32) resolver.Address {
Expand All @@ -60,7 +60,8 @@ func (s) TestRingNew(t *testing.T) {
for _, a := range testAddrs {
var count int
for _, ii := range r.items {
if ii.sc.addr == a.Addr {
// In those tests the hash key is the default, which is the address.
if ii.sc.hashKey == a.Addr {
count++
}
}
Expand Down
35 changes: 24 additions & 11 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/ringhash"
"google.golang.org/grpc/serviceconfig"
)

Expand Down Expand Up @@ -65,10 +66,10 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
}

type subConn struct {
addr string
weight uint32
sc balancer.SubConn
logger *grpclog.PrefixLogger
hashKey string
weight uint32
sc balancer.SubConn
logger *grpclog.PrefixLogger

mu sync.RWMutex
// This is the actual state of this SubConn (as updated by the ClientConn).
Expand Down Expand Up @@ -207,6 +208,7 @@ type ringhashBalancer struct {
// - an address was added
// - an address was removed
// - an address's weight was updated
// - an address's hash key was updated
//
// Note that this function doesn't trigger SubConn connecting, so all the new
// SubConn states are Idle.
Expand All @@ -217,6 +219,10 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
for _, addr := range addrs {
addrsSet.Set(addr, true)
newWeight := getWeightAttribute(addr)
newHashKey := ringhash.GetAddrHashKey(addr)
if newHashKey == "" {
newHashKey = addr.Addr
}
if val, ok := b.subConns.Get(addr); !ok {
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
Expand All @@ -228,28 +234,35 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
b.logger.Warningf("Failed to create new SubConn: %v", err)
continue
}
scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc}
scs := &subConn{hashKey: newHashKey, weight: newWeight, sc: sc}
scs.logger = subConnPrefixLogger(b, scs)
scs.setState(connectivity.Idle)
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
b.subConns.Set(addr, scs)
b.scStates[sc] = scs
addrsUpdated = true
} else {
// We have seen this address before and created a subConn for it. If the
// weight associated with the address has changed, update the subConns map
// with the new weight. This will be used when a new ring is created.
// We have seen this address before and created a subConn for it. If
// the weight or the hash key associated with the address has
// changed, update the subConns map with the new weight and/or hash
// key. This will be used when a new ring is created.
//
// There is no need to call UpdateAddresses on the subConn at this point
// since *only* the weight attribute has changed, and that does not affect
// subConn uniqueness.
// There is no need to call UpdateAddresses on the subConn at this
// point since *only* the weight and/or hash key attribute has
// changed, and that does not affect subConn uniqueness.
scInfo := val.(*subConn)
if oldWeight := scInfo.weight; oldWeight != newWeight {
scInfo.weight = newWeight
b.subConns.Set(addr, scInfo)
// Return true to force recreation of the ring.
addrsUpdated = true
}
if oldHashKey := scInfo.hashKey; oldHashKey != newHashKey {
scInfo.hashKey = newHashKey
b.subConns.Set(addr, scInfo)
// Return true to force recreation of the ring.
addrsUpdated = true
}
}
}
for _, addr := range b.subConns.Keys() {
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/ringhash/ringhash_test.go
Expand Up @@ -33,7 +33,7 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/resolver/ringhash"
)

var (
Expand Down Expand Up @@ -538,10 +538,10 @@ func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) {
// the pointer is different. This test verifies that subConns are not recreated
// in this scenario.
func (s) TestAddrBalancerAttributesChange(t *testing.T) {
addrs1 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
addrs1 := []resolver.Address{ringhash.SetAddrHashKey(resolver.Address{Addr: testBackendAddrStrs[0]}, "test_key")}
cc, b, _ := setupTest(t, addrs1)

addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
addrs2 := []resolver.Address{ringhash.SetAddrHashKey(resolver.Address{Addr: testBackendAddrStrs[0]}, "test_key")}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs2},
BalancerConfig: testConfig,
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/resolver/serviceconfig.go
Expand Up @@ -27,7 +27,7 @@ import (
"sync/atomic"
"time"

xxhash "github.com/cespare/xxhash/v2"
"github.com/cespare/xxhash/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcutil"
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/resolver/serviceconfig_test.go
Expand Up @@ -23,7 +23,7 @@ import (
"regexp"
"testing"

xxhash "github.com/cespare/xxhash/v2"
"github.com/cespare/xxhash/v2"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/grpcutil"
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/resolver/xds_resolver_test.go
Expand Up @@ -25,7 +25,7 @@ import (
"testing"
"time"

xxhash "github.com/cespare/xxhash/v2"
"github.com/cespare/xxhash/v2"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
Expand Down
1 change: 1 addition & 0 deletions xds/internal/xdsclient/xdsresource/type_eds.go
Expand Up @@ -52,6 +52,7 @@ type Endpoint struct {
Address string
HealthStatus EndpointHealthStatus
Weight uint32
HashKey string
}

// Locality contains information of a locality.
Expand Down
20 changes: 20 additions & 0 deletions xds/internal/xdsclient/xdsresource/unmarshal_eds.go
Expand Up @@ -98,15 +98,35 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs
return nil, fmt.Errorf("duplicate endpoint with the same address %s", addr)
}
uniqueEndpointAddrs[addr] = true

hashKey := hashKey(lbEndpoint)
endpoints = append(endpoints, Endpoint{
HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
Address: addr,
Weight: weight,
HashKey: hashKey,
})
}
return endpoints, nil
}

// hashKey extracts and returns the hash key from the given LbEndpoint. If no
// hash key is found, it returns an empty string.
func hashKey(lbEndpoint *v3endpointpb.LbEndpoint) string {
// "The xDS resolver, described in A74, will be changed to set the hash_key
// endpoint attribute to the value of LbEndpoint.Metadata envoy.lb hash_key
// field, as described in Envoy's documentation for the ring hash load
// balancer." - A76
var hashKey string
envoyLB := lbEndpoint.GetMetadata().GetFilterMetadata()["envoy.lb"]
if envoyLB != nil {
if h := envoyLB.GetFields()["hash_key"]; h != nil {
hashKey = h.GetStringValue()
}
}
return hashKey
}

func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, error) {
ret := EndpointsUpdate{}
for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
Expand Down

0 comments on commit e1bf6bf

Please sign in to comment.