Skip to content

Commit

Permalink
attributes: add Equal method; resolver: add AddressMap and State.Bala…
Browse files Browse the repository at this point in the history
…ncerAttributes (#4855)
  • Loading branch information
dfawley committed Oct 15, 2021
1 parent 3db1cb0 commit 36d8757
Show file tree
Hide file tree
Showing 23 changed files with 594 additions and 217 deletions.
78 changes: 49 additions & 29 deletions attributes/attributes.go
Expand Up @@ -25,55 +25,75 @@
// later release.
package attributes

import "fmt"

// Attributes is an immutable struct for storing and retrieving generic
// key/value pairs. Keys must be hashable, and users should define their own
// types for keys.
// types for keys. Values should not be modified after they are added to an
// Attributes or if they were received from one. If values implement 'Equal(o
// interface{}) bool', it will be called by (*Attributes).Equal to determine
// whether two values with the same key should be considered equal.
type Attributes struct {
m map[interface{}]interface{}
}

// New returns a new Attributes containing all key/value pairs in kvs. If the
// same key appears multiple times, the last value overwrites all previous
// values for that key. Panics if len(kvs) is not even.
func New(kvs ...interface{}) *Attributes {
if len(kvs)%2 != 0 {
panic(fmt.Sprintf("attributes.New called with unexpected input: len(kvs) = %v", len(kvs)))
}
a := &Attributes{m: make(map[interface{}]interface{}, len(kvs)/2)}
for i := 0; i < len(kvs)/2; i++ {
a.m[kvs[i*2]] = kvs[i*2+1]
}
return a
// New returns a new Attributes containing the key/value pair.
func New(key, value interface{}) *Attributes {
return &Attributes{m: map[interface{}]interface{}{key: value}}
}

// WithValues returns a new Attributes containing all key/value pairs in a and
// kvs. Panics if len(kvs) is not even. If the same key appears multiple
// times, the last value overwrites all previous values for that key. To
// remove an existing key, use a nil value.
func (a *Attributes) WithValues(kvs ...interface{}) *Attributes {
// WithValue returns a new Attributes containing the previous keys and values
// and the new key/value pair. If the same key appears multiple times, the
// last value overwrites all previous values for that key. To remove an
// existing key, use a nil value. value should not be modified later.
func (a *Attributes) WithValue(key, value interface{}) *Attributes {
if a == nil {
return New(kvs...)
return New(key, value)
}
if len(kvs)%2 != 0 {
panic(fmt.Sprintf("attributes.New called with unexpected input: len(kvs) = %v", len(kvs)))
}
n := &Attributes{m: make(map[interface{}]interface{}, len(a.m)+len(kvs)/2)}
n := &Attributes{m: make(map[interface{}]interface{}, len(a.m)+1)}
for k, v := range a.m {
n.m[k] = v
}
for i := 0; i < len(kvs)/2; i++ {
n.m[kvs[i*2]] = kvs[i*2+1]
}
n.m[key] = value
return n
}

// Value returns the value associated with these attributes for key, or nil if
// no value is associated with key.
// no value is associated with key. The returned value should not be modified.
func (a *Attributes) Value(key interface{}) interface{} {
if a == nil {
return nil
}
return a.m[key]
}

// Equal returns whether a and o are equivalent. If 'Equal(o interface{})
// bool' is implemented for a value in the attributes, it is called to
// determine if the value matches the one stored in the other attributes. If
// Equal is not implemented, standard equality is used to determine if the two
// values are equal.
func (a *Attributes) Equal(o *Attributes) bool {
if a == nil && o == nil {
return true
}
if a == nil || o == nil {
return false
}
if len(a.m) != len(o.m) {
return false
}
for k, v := range a.m {
ov, ok := o.m[k]
if !ok {
// o missing element of a
return false
}
if eq, ok := v.(interface{ Equal(o interface{}) bool }); ok {
if !eq.Equal(ov) {
return false
}
} else if v != ov {
// Fallback to a standard equality check if Value is unimplemented.
return false
}
}
return true
}
54 changes: 42 additions & 12 deletions attributes/attributes_test.go
Expand Up @@ -20,41 +20,71 @@ package attributes_test

import (
"fmt"
"reflect"
"testing"

"google.golang.org/grpc/attributes"
)

type stringVal struct {
s string
}

func (s stringVal) Equal(o interface{}) bool {
os, ok := o.(stringVal)
return ok && s.s == os.s
}

func ExampleAttributes() {
type keyOne struct{}
type keyTwo struct{}
a := attributes.New(keyOne{}, 1, keyTwo{}, "two")
a := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"})
fmt.Println("Key one:", a.Value(keyOne{}))
fmt.Println("Key two:", a.Value(keyTwo{}))
// Output:
// Key one: 1
// Key two: two
// Key two: {two}
}

func ExampleAttributes_WithValues() {
func ExampleAttributes_WithValue() {
type keyOne struct{}
type keyTwo struct{}
a := attributes.New(keyOne{}, 1)
a = a.WithValues(keyTwo{}, "two")
a = a.WithValue(keyTwo{}, stringVal{s: "two"})
fmt.Println("Key one:", a.Value(keyOne{}))
fmt.Println("Key two:", a.Value(keyTwo{}))
// Output:
// Key one: 1
// Key two: two
// Key two: {two}
}

// Test that two attributes with the same content are `reflect.DeepEqual`.
func TestDeepEqual(t *testing.T) {
// Test that two attributes with the same content are Equal.
func TestEqual(t *testing.T) {
type keyOne struct{}
a1 := attributes.New(keyOne{}, 1)
a2 := attributes.New(keyOne{}, 1)
if !reflect.DeepEqual(a1, a2) {
t.Fatalf("reflect.DeepEqual(%+v, %+v), want true, got false", a1, a2)
type keyTwo struct{}
a1 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"})
a2 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"})
if !a1.Equal(a2) {
t.Fatalf("%+v.Equals(%+v) = false; want true", a1, a2)
}
if !a2.Equal(a1) {
t.Fatalf("%+v.Equals(%+v) = false; want true", a2, a1)
}
}

// Test that two attributes with different content are not Equal.
func TestNotEqual(t *testing.T) {
type keyOne struct{}
type keyTwo struct{}
a1 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"})
a2 := attributes.New(keyOne{}, 2).WithValue(keyTwo{}, stringVal{s: "two"})
a3 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "one"})
if a1.Equal(a2) {
t.Fatalf("%+v.Equals(%+v) = true; want false", a1, a2)
}
if a2.Equal(a1) {
t.Fatalf("%+v.Equals(%+v) = true; want false", a2, a1)
}
if a3.Equal(a1) {
t.Fatalf("%+v.Equals(%+v) = true; want false", a3, a1)
}
}
67 changes: 18 additions & 49 deletions balancer/base/balancer.go
Expand Up @@ -22,7 +22,6 @@ import (
"errors"
"fmt"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
Expand All @@ -42,7 +41,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
cc: cc,
pickerBuilder: bb.pickerBuilder,

subConns: make(map[resolver.Address]subConnInfo),
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
Expand All @@ -58,19 +57,14 @@ func (bb *baseBuilder) Name() string {
return bb.name
}

type subConnInfo struct {
subConn balancer.SubConn
attrs *attributes.Attributes
}

type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder

csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State

subConns map[resolver.Address]subConnInfo // `attributes` is stripped from the keys of this map (the addresses)
subConns *resolver.AddressMap
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
config Config
Expand All @@ -81,7 +75,7 @@ type baseBalancer struct {

func (b *baseBalancer) ResolverError(err error) {
b.resolverErr = err
if len(b.subConns) == 0 {
if b.subConns.Len() == 0 {
b.state = connectivity.TransientFailure
}

Expand All @@ -105,57 +99,32 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
addrsSet := resolver.NewAddressMap()
for _, a := range s.ResolverState.Addresses {
// Strip attributes from addresses before using them as map keys. So
// that when two addresses only differ in attributes pointers (but with
// the same attribute content), they are considered the same address.
//
// Note that this doesn't handle the case where the attribute content is
// different. So if users want to set different attributes to create
// duplicate connections to the same backend, it doesn't work. This is
// fine for now, because duplicate is done by setting Metadata today.
//
// TODO: read attributes to handle duplicate connections.
aNoAttrs := a
aNoAttrs.Attributes = nil
addrsSet[aNoAttrs] = struct{}{}
if scInfo, ok := b.subConns[aNoAttrs]; !ok {
addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {
// a is a new address (not existing in b.subConns).
//
// When creating SubConn, the original address with attributes is
// passed through. So that connection configurations in attributes
// (like creds) will be used.
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes}
b.subConns.Set(a, sc)
b.scStates[sc] = connectivity.Idle
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
} else {
// Always update the subconn's address in case the attributes
// changed.
//
// The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same
// as the old one (including attributes).
scInfo.attrs = a.Attributes
b.subConns[aNoAttrs] = scInfo
b.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a})
}
}
for a, scInfo := range b.subConns {
b.subConns.Range(func(a resolver.Address, sci interface{}) {
sc := sci.(balancer.SubConn)
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
b.cc.RemoveSubConn(scInfo.subConn)
delete(b.subConns, a)
if _, ok := addrsSet.Get(a); !ok {
b.cc.RemoveSubConn(sc)
b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
}
}
})
// If resolver state contains no addresses, return an error so ClientConn
// will trigger re-resolve. Also records this as an resolver error, so when
// the overall state turns transient failure, the error message will have
Expand Down Expand Up @@ -193,12 +162,12 @@ func (b *baseBalancer) regeneratePicker() {
readySCs := make(map[balancer.SubConn]SubConnInfo)

// Filter out all ready SCs from full subConn map.
for addr, scInfo := range b.subConns {
if st, ok := b.scStates[scInfo.subConn]; ok && st == connectivity.Ready {
addr.Attributes = scInfo.attrs
readySCs[scInfo.subConn] = SubConnInfo{Address: addr}
b.subConns.Range(func(addr resolver.Address, sci interface{}) {
sc := sci.(balancer.SubConn)
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[sc] = SubConnInfo{Address: addr}
}
}
})
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}

Expand Down
28 changes: 0 additions & 28 deletions balancer/base/balancer_test.go
Expand Up @@ -54,34 +54,6 @@ func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker {
return nil
}

func TestBaseBalancerStripAttributes(t *testing.T) {
b := (&baseBuilder{}).Build(&testClientConn{
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
for _, addr := range addrs {
if addr.Attributes == nil {
t.Errorf("in NewSubConn, got address %+v with nil attributes, want not nil", addr)
}
}
return &testSubConn{}, nil
},
}, balancer.BuildOptions{}).(*baseBalancer)

b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: []resolver.Address{
{Addr: "1.1.1.1", Attributes: &attributes.Attributes{}},
{Addr: "2.2.2.2", Attributes: &attributes.Attributes{}},
},
},
})

for addr := range b.subConns {
if addr.Attributes != nil {
t.Errorf("in b.subConns, got address %+v with not nil attributes, want nil", addr)
}
}
}

func TestBaseBalancerReserveAttributes(t *testing.T) {
var v = func(info PickerBuildInfo) {
for _, sc := range info.ReadySCs {
Expand Down
2 changes: 1 addition & 1 deletion balancer/grpclb/state/state.go
Expand Up @@ -39,7 +39,7 @@ type State struct {
// Set returns a copy of the provided state with attributes containing s. s's
// data should not be mutated after calling Set.
func Set(state resolver.State, s *State) resolver.State {
state.Attributes = state.Attributes.WithValues(key, s)
state.Attributes = state.Attributes.WithValue(key, s)
return state
}

Expand Down

0 comments on commit 36d8757

Please sign in to comment.