Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer: set RPC metadata in address attributes, instead of Metadata field #4041

Merged
merged 3 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,19 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
addrsSet := make(map[resolver.Address]struct{})
// Create new SubConns.
for _, addr := range backendAddrs {
addrWithoutMD := addr
addrWithoutMD.Attributes = nil
addrsSet[addrWithoutMD] = struct{}{}
lb.backendAddrsWithoutMetadata = append(lb.backendAddrsWithoutMetadata, addrWithoutMD)
addrWithoutAttrs := addr
addrWithoutAttrs.Attributes = nil
addrsSet[addrWithoutAttrs] = struct{}{}
lb.backendAddrsWithoutMetadata = append(lb.backendAddrsWithoutMetadata, addrWithoutAttrs)

if _, ok := lb.subConns[addrWithoutMD]; !ok {
if _, ok := lb.subConns[addrWithoutAttrs]; !ok {
// Use addrWithMD to create the SubConn.
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
if err != nil {
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
continue
}
lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
lb.subConns[addrWithoutAttrs] = sc // Use the addr without MD as key for the map.
if _, ok := lb.scStates[sc]; !ok {
// Only set state of new sc to IDLE. The state could already be
// READY for cached SubConns.
Expand Down
10 changes: 5 additions & 5 deletions balancer/grpclb/grpclb_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,16 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
if len(addrs) != 1 {
return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs))
}
addrWithoutMD := addrs[0]
addrWithoutMD.Attributes = nil
addrWithoutAttrs := addrs[0]
addrWithoutAttrs.Attributes = nil

ccc.mu.Lock()
defer ccc.mu.Unlock()
if entry, ok := ccc.subConnCache[addrWithoutMD]; ok {
if entry, ok := ccc.subConnCache[addrWithoutAttrs]; ok {
// If entry is in subConnCache, the SubConn was being deleted.
// cancel function will never be nil.
entry.cancel()
delete(ccc.subConnCache, addrWithoutMD)
delete(ccc.subConnCache, addrWithoutAttrs)
return entry.sc, nil
}

Expand All @@ -142,7 +142,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
return nil, err
}

ccc.subConnToAddr[scNew] = addrWithoutMD
ccc.subConnToAddr[scNew] = addrWithoutAttrs
return scNew, nil
}

Expand Down
10 changes: 1 addition & 9 deletions internal/hierarchy/hierarchy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
package hierarchy

import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)

Expand All @@ -37,19 +36,12 @@ func Get(addr resolver.Address) []string {
if attrs == nil {
return nil
}
path, ok := attrs.Value(pathKey).([]string)
if !ok {
return nil
}
path, _ := attrs.Value(pathKey).([]string)
return path
}

// Set overrides the hierarchical path in addr with path.
func Set(addr resolver.Address, path []string) resolver.Address {
if addr.Attributes == nil {
addr.Attributes = attributes.New(pathKey, path)
return addr
}
addr.Attributes = addr.Attributes.WithValues(pathKey, path)
return addr
}
Expand Down
10 changes: 1 addition & 9 deletions internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
package metadata

import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
Expand All @@ -37,10 +36,7 @@ func Get(addr resolver.Address) metadata.MD {
if attrs == nil {
return nil
}
md, ok := attrs.Value(mdKey).(metadata.MD)
if !ok {
return nil
}
md, _ := attrs.Value(mdKey).(metadata.MD)
return md
}

Expand All @@ -49,10 +45,6 @@ func Get(addr resolver.Address) metadata.MD {
// When a SubConn is created with this address, the RPCs sent on it will all
// have this metadata.
func Set(addr resolver.Address, md metadata.MD) resolver.Address {
if addr.Attributes == nil {
addr.Attributes = attributes.New(mdKey, md)
return addr
}
addr.Attributes = addr.Attributes.WithValues(mdKey, md)
return addr
}
10 changes: 4 additions & 6 deletions test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,19 +579,16 @@ func (s) TestMetadataInAddressAttributes(t *testing.T) {
stub.Register(mdBalancerName, bf)
t.Logf("Registered balancer %s...", mdBalancerName)

r := manual.NewBuilderWithScheme("whatever")
t.Logf("Registered manual resolver with scheme %s...", r.Scheme())

testMDChan := make(chan []string, 1)
ss := &stubServer{
emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
select {
case <-testMDChan:
default:
case testMDChan <- md[testMDKey]:
case <-ctx.Done():
return nil, ctx.Err()
}
testMDChan <- md[testMDKey]
}
return &testpb.Empty{}, nil
},
Expand All @@ -601,6 +598,7 @@ func (s) TestMetadataInAddressAttributes(t *testing.T) {
}
defer ss.Stop()

r := manual.NewBuilderWithScheme("whatever")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you know? You can inject this into the stubserver and Dial will use it:

grpc-go/test/end2end_test.go

Lines 5197 to 5200 in 230166b

if ss.r != nil {
ss.target = ss.r.Scheme() + ":///" + ss.address
opts = append(opts, grpc.WithResolvers(ss.r))
}

Maybe we should delete that feature since it seems to be well hidden and nonobvious? Or you should use it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I knew it. But you cannot set balancer/init service config.
I didn't want to add all the fields just to pass the dial options.

I'm OK with removing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing where your test needs to set balancer/init service config, though. You are using a default service config, and then doing the same resolver update that stubServer would do automatically.

Copy link
Contributor Author

@menghanl menghanl Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm dumb. I need to set the balancer in init service config, to use the balancer that sets the attributes. But ss.Start works. Fixed now. (But github hates me?)

dopts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithResolvers(r),
Expand Down