Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

attributes: add Equal method; resolver: add AddressMap and State.BalancerAttributes #4855

Merged
merged 7 commits into from Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 49 additions & 29 deletions attributes/attributes.go
Expand Up @@ -25,55 +25,75 @@
// later release.
package attributes

import "fmt"

// Attributes is an immutable struct for storing and retrieving generic
// key/value pairs. Keys must be hashable, and users should define their own
// types for keys.
// types for keys. Values should not be modified after they are added to an
// Attributes or if they were received from one. If values implement 'Equal(o
// interface{}) bool', it will be called by (*Attributes).Equal to determine
// whether two values with the same key should be considered equal.
type Attributes struct {
m map[interface{}]interface{}
}

// New returns a new Attributes containing all key/value pairs in kvs. If the
// same key appears multiple times, the last value overwrites all previous
// values for that key. Panics if len(kvs) is not even.
func New(kvs ...interface{}) *Attributes {
if len(kvs)%2 != 0 {
panic(fmt.Sprintf("attributes.New called with unexpected input: len(kvs) = %v", len(kvs)))
}
a := &Attributes{m: make(map[interface{}]interface{}, len(kvs)/2)}
for i := 0; i < len(kvs)/2; i++ {
a.m[kvs[i*2]] = kvs[i*2+1]
}
return a
// New returns a new Attributes containing the key/value pair.
func New(key, value interface{}) *Attributes {
return &Attributes{m: map[interface{}]interface{}{key: value}}
}

// WithValues returns a new Attributes containing all key/value pairs in a and
// kvs. Panics if len(kvs) is not even. If the same key appears multiple
// times, the last value overwrites all previous values for that key. To
// remove an existing key, use a nil value.
func (a *Attributes) WithValues(kvs ...interface{}) *Attributes {
// WithValue returns a new Attributes containing the previous keys and values
// and the new key/value pair. If the same key appears multiple times, the
// last value overwrites all previous values for that key. To remove an
// existing key, use a nil value. value should not be modified later.
func (a *Attributes) WithValue(key, value interface{}) *Attributes {
if a == nil {
return New(kvs...)
return New(key, value)
}
if len(kvs)%2 != 0 {
panic(fmt.Sprintf("attributes.New called with unexpected input: len(kvs) = %v", len(kvs)))
}
n := &Attributes{m: make(map[interface{}]interface{}, len(a.m)+len(kvs)/2)}
n := &Attributes{m: make(map[interface{}]interface{}, len(a.m)+1)}
for k, v := range a.m {
n.m[k] = v
}
for i := 0; i < len(kvs)/2; i++ {
n.m[kvs[i*2]] = kvs[i*2+1]
}
n.m[key] = value
return n
}

// Value returns the value associated with these attributes for key, or nil if
// no value is associated with key.
// no value is associated with key. The returned value should not be modified.
func (a *Attributes) Value(key interface{}) interface{} {
if a == nil {
return nil
}
return a.m[key]
}

// Equal returns whether a and o are equivalent. If 'Equal(o interface{})
// bool' is implemented for a value in the attributes, it is called to
// determine if the value matches the one stored in the other attributes. If
// Equal is not implemented, standard equality is used to determine if the two
// values are equal.
func (a *Attributes) Equal(o *Attributes) bool {
if a == nil && o == nil {
return true
}
if a == nil || o == nil {
return false
}
if len(a.m) != len(o.m) {
return false
}
for k, v := range a.m {
ov, ok := o.m[k]
if !ok {
// o missing element of a
return false
}
if eq, ok := v.(interface{ Equal(o interface{}) bool }); ok {
if !eq.Equal(ov) {
return false
}
} else if v != ov {
// Fallback to a standard equality check if Value is unimplemented.
return false
}
}
return true
}
54 changes: 42 additions & 12 deletions attributes/attributes_test.go
Expand Up @@ -20,41 +20,71 @@ package attributes_test

import (
"fmt"
"reflect"
"testing"

"google.golang.org/grpc/attributes"
)

type stringVal struct {
s string
}

func (s stringVal) Equal(o interface{}) bool {
menghanl marked this conversation as resolved.
Show resolved Hide resolved
os, ok := o.(stringVal)
return ok && s.s == os.s
}

func ExampleAttributes() {
type keyOne struct{}
type keyTwo struct{}
a := attributes.New(keyOne{}, 1, keyTwo{}, "two")
a := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"})
fmt.Println("Key one:", a.Value(keyOne{}))
fmt.Println("Key two:", a.Value(keyTwo{}))
// Output:
// Key one: 1
// Key two: two
// Key two: {two}
}

func ExampleAttributes_WithValues() {
func ExampleAttributes_WithValue() {
type keyOne struct{}
type keyTwo struct{}
a := attributes.New(keyOne{}, 1)
a = a.WithValues(keyTwo{}, "two")
a = a.WithValue(keyTwo{}, stringVal{s: "two"})
fmt.Println("Key one:", a.Value(keyOne{}))
fmt.Println("Key two:", a.Value(keyTwo{}))
// Output:
// Key one: 1
// Key two: two
// Key two: {two}
}

// Test that two attributes with the same content are `reflect.DeepEqual`.
func TestDeepEqual(t *testing.T) {
// Test that two attributes with the same content are Equal.
func TestEqual(t *testing.T) {
type keyOne struct{}
a1 := attributes.New(keyOne{}, 1)
a2 := attributes.New(keyOne{}, 1)
if !reflect.DeepEqual(a1, a2) {
t.Fatalf("reflect.DeepEqual(%+v, %+v), want true, got false", a1, a2)
type keyTwo struct{}
a1 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"})
a2 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"})
if !a1.Equal(a2) {
t.Fatalf("%+v.Equals(%+v) = false; want true", a1, a2)
}
if !a2.Equal(a1) {
t.Fatalf("%+v.Equals(%+v) = false; want true", a2, a1)
}
}

// Test that two attributes with different content are not Equal.
func TestNotEqual(t *testing.T) {
type keyOne struct{}
type keyTwo struct{}
a1 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"})
a2 := attributes.New(keyOne{}, 2).WithValue(keyTwo{}, stringVal{s: "two"})
a3 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "one"})
if a1.Equal(a2) {
t.Fatalf("%+v.Equals(%+v) = true; want false", a1, a2)
}
if a2.Equal(a1) {
t.Fatalf("%+v.Equals(%+v) = true; want false", a2, a1)
}
if a3.Equal(a1) {
t.Fatalf("%+v.Equals(%+v) = true; want false", a3, a1)
}
}
67 changes: 18 additions & 49 deletions balancer/base/balancer.go
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"fmt"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
Expand All @@ -42,7 +41,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
cc: cc,
pickerBuilder: bb.pickerBuilder,

subConns: make(map[resolver.Address]subConnInfo),
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
Expand All @@ -58,19 +57,14 @@ func (bb *baseBuilder) Name() string {
return bb.name
}

type subConnInfo struct {
subConn balancer.SubConn
attrs *attributes.Attributes
}

type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder

csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State

subConns map[resolver.Address]subConnInfo // `attributes` is stripped from the keys of this map (the addresses)
subConns *resolver.AddressMap
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
config Config
Expand All @@ -81,7 +75,7 @@ type baseBalancer struct {

func (b *baseBalancer) ResolverError(err error) {
b.resolverErr = err
if len(b.subConns) == 0 {
if b.subConns.Len() == 0 {
b.state = connectivity.TransientFailure
}

Expand All @@ -105,57 +99,32 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
addrsSet := resolver.NewAddressMap()
for _, a := range s.ResolverState.Addresses {
// Strip attributes from addresses before using them as map keys. So
// that when two addresses only differ in attributes pointers (but with
// the same attribute content), they are considered the same address.
//
// Note that this doesn't handle the case where the attribute content is
// different. So if users want to set different attributes to create
// duplicate connections to the same backend, it doesn't work. This is
// fine for now, because duplicate is done by setting Metadata today.
//
// TODO: read attributes to handle duplicate connections.
aNoAttrs := a
aNoAttrs.Attributes = nil
addrsSet[aNoAttrs] = struct{}{}
if scInfo, ok := b.subConns[aNoAttrs]; !ok {
addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {
// a is a new address (not existing in b.subConns).
//
// When creating SubConn, the original address with attributes is
// passed through. So that connection configurations in attributes
// (like creds) will be used.
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes}
b.subConns.Set(a, sc)
b.scStates[sc] = connectivity.Idle
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
} else {
// Always update the subconn's address in case the attributes
// changed.
//
// The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same
// as the old one (including attributes).
scInfo.attrs = a.Attributes
b.subConns[aNoAttrs] = scInfo
b.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a})
}
}
for a, scInfo := range b.subConns {
b.subConns.Range(func(a resolver.Address, sci interface{}) {
sc := sci.(balancer.SubConn)
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
b.cc.RemoveSubConn(scInfo.subConn)
delete(b.subConns, a)
if _, ok := addrsSet.Get(a); !ok {
b.cc.RemoveSubConn(sc)
b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
}
}
})
// If resolver state contains no addresses, return an error so ClientConn
// will trigger re-resolve. Also records this as an resolver error, so when
// the overall state turns transient failure, the error message will have
Expand Down Expand Up @@ -193,12 +162,12 @@ func (b *baseBalancer) regeneratePicker() {
readySCs := make(map[balancer.SubConn]SubConnInfo)

// Filter out all ready SCs from full subConn map.
for addr, scInfo := range b.subConns {
if st, ok := b.scStates[scInfo.subConn]; ok && st == connectivity.Ready {
addr.Attributes = scInfo.attrs
readySCs[scInfo.subConn] = SubConnInfo{Address: addr}
b.subConns.Range(func(addr resolver.Address, sci interface{}) {
sc := sci.(balancer.SubConn)
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[sc] = SubConnInfo{Address: addr}
}
}
})
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}

Expand Down
28 changes: 0 additions & 28 deletions balancer/base/balancer_test.go
Expand Up @@ -54,34 +54,6 @@ func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker {
return nil
}

func TestBaseBalancerStripAttributes(t *testing.T) {
b := (&baseBuilder{}).Build(&testClientConn{
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
for _, addr := range addrs {
if addr.Attributes == nil {
t.Errorf("in NewSubConn, got address %+v with nil attributes, want not nil", addr)
}
}
return &testSubConn{}, nil
},
}, balancer.BuildOptions{}).(*baseBalancer)

b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: []resolver.Address{
{Addr: "1.1.1.1", Attributes: &attributes.Attributes{}},
{Addr: "2.2.2.2", Attributes: &attributes.Attributes{}},
},
},
})

for addr := range b.subConns {
if addr.Attributes != nil {
t.Errorf("in b.subConns, got address %+v with not nil attributes, want nil", addr)
}
}
}

func TestBaseBalancerReserveAttributes(t *testing.T) {
var v = func(info PickerBuildInfo) {
for _, sc := range info.ReadySCs {
Expand Down
2 changes: 1 addition & 1 deletion balancer/grpclb/state/state.go
Expand Up @@ -39,7 +39,7 @@ type State struct {
// Set returns a copy of the provided state with attributes containing s. s's
// data should not be mutated after calling Set.
func Set(state resolver.State, s *State) resolver.State {
state.Attributes = state.Attributes.WithValues(key, s)
state.Attributes = state.Attributes.WithValue(key, s)
dfawley marked this conversation as resolved.
Show resolved Hide resolved
return state
}

Expand Down