-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
balancer: set RPC metadata in address attributes, instead of Metadata field #4041
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,7 +125,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer | |
return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs)) | ||
} | ||
addrWithoutMD := addrs[0] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
addrWithoutMD.Metadata = nil | ||
addrWithoutMD.Attributes = nil | ||
|
||
ccc.mu.Lock() | ||
defer ccc.mu.Unlock() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* | ||
* Copyright 2020 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
// Package metadata contains functions to set and get metadata from addresses. | ||
// | ||
// This package is experimental. | ||
package metadata | ||
|
||
import ( | ||
"google.golang.org/grpc/attributes" | ||
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/resolver" | ||
) | ||
|
||
type mdKeyType string | ||
|
||
const mdKey = mdKeyType("grpc.internal.address.metadata") | ||
|
||
// Get returns the metadata of addr. | ||
func Get(addr resolver.Address) metadata.MD { | ||
attrs := addr.Attributes | ||
if attrs == nil { | ||
return nil | ||
} | ||
md, ok := attrs.Value(mdKey).(metadata.MD) | ||
if !ok { | ||
return nil | ||
} | ||
return md | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need |
||
} | ||
|
||
// Set sets (overrides) the metadata in addr. | ||
// | ||
// When a SubConn is created with this address, the RPCs sent on it will all | ||
// have this metadata. | ||
func Set(addr resolver.Address, md metadata.MD) resolver.Address { | ||
if addr.Attributes == nil { | ||
addr.Attributes = attributes.New(mdKey, md) | ||
return addr | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be deleted, as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
addr.Attributes = addr.Attributes.WithValues(mdKey, md) | ||
return addr | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
/* | ||
* | ||
* Copyright 2020 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
package metadata | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/google/go-cmp/cmp" | ||
"google.golang.org/grpc/attributes" | ||
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/resolver" | ||
) | ||
|
||
func TestGet(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
addr resolver.Address | ||
want metadata.MD | ||
}{ | ||
{ | ||
name: "not set", | ||
addr: resolver.Address{}, | ||
want: nil, | ||
}, | ||
{ | ||
name: "not set", | ||
addr: resolver.Address{ | ||
Attributes: attributes.New(mdKey, metadata.Pairs("k", "v")), | ||
}, | ||
want: metadata.Pairs("k", "v"), | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
if got := Get(tt.addr); !cmp.Equal(got, tt.want) { | ||
t.Errorf("Get() = %v, want %v", got, tt.want) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestSet(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
addr resolver.Address | ||
md metadata.MD | ||
}{ | ||
{ | ||
name: "unset before", | ||
addr: resolver.Address{}, | ||
md: metadata.Pairs("k", "v"), | ||
}, | ||
{ | ||
name: "set before", | ||
addr: resolver.Address{ | ||
Attributes: attributes.New(mdKey, metadata.Pairs("bef", "ore")), | ||
}, | ||
md: metadata.Pairs("k", "v"), | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
newAddr := Set(tt.addr, tt.md) | ||
newMD := Get(newAddr) | ||
if !cmp.Equal(newMD, tt.md) { | ||
t.Errorf("md after Set() = %v, want %v", newMD, tt.md) | ||
} | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,7 @@ import ( | |
"google.golang.org/grpc/internal/balancerload" | ||
"google.golang.org/grpc/internal/grpcsync" | ||
"google.golang.org/grpc/internal/grpcutil" | ||
imetadata "google.golang.org/grpc/internal/metadata" | ||
"google.golang.org/grpc/internal/testutils" | ||
"google.golang.org/grpc/metadata" | ||
"google.golang.org/grpc/resolver" | ||
|
@@ -543,6 +544,95 @@ func (s) TestAddressAttributesInNewSubConn(t *testing.T) { | |
} | ||
} | ||
|
||
// TestMetadataInAddressAttributes verifies that the metadata added to | ||
// address.Attributes will be sent with the RPCs. | ||
func (s) TestMetadataInAddressAttributes(t *testing.T) { | ||
const ( | ||
testMDKey = "test-md" | ||
testMDValue = "test-md-value" | ||
mdBalancerName = "metadata-balancer" | ||
) | ||
|
||
// Register a stub balancer which adds metadata to the first address that it | ||
// receives and then calls NewSubConn on it. | ||
bf := stub.BalancerFuncs{ | ||
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { | ||
addrs := ccs.ResolverState.Addresses | ||
if len(addrs) == 0 { | ||
return nil | ||
} | ||
|
||
// Only use the first address. | ||
sc, err := bd.ClientConn.NewSubConn([]resolver.Address{ | ||
imetadata.Set(addrs[0], metadata.Pairs(testMDKey, testMDValue)), | ||
}, balancer.NewSubConnOptions{}) | ||
if err != nil { | ||
return err | ||
} | ||
sc.Connect() | ||
return nil | ||
}, | ||
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { | ||
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: state.ConnectivityState, Picker: &aiPicker{result: balancer.PickResult{SubConn: sc}, err: state.ConnectionError}}) | ||
}, | ||
} | ||
stub.Register(mdBalancerName, bf) | ||
t.Logf("Registered balancer %s...", mdBalancerName) | ||
|
||
r := manual.NewBuilderWithScheme("whatever") | ||
t.Logf("Registered manual resolver with scheme %s...", r.Scheme()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this registering? It doesn't need to ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not registered. So we will need |
||
|
||
testMDChan := make(chan []string, 1) | ||
ss := &stubServer{ | ||
emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { | ||
md, ok := metadata.FromIncomingContext(ctx) | ||
if ok { | ||
select { | ||
case <-testMDChan: | ||
default: | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you need this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. Made it a switch with context.Done(). |
||
testMDChan <- md[testMDKey] | ||
} | ||
return &testpb.Empty{}, nil | ||
}, | ||
} | ||
if err := ss.Start(nil); err != nil { | ||
t.Fatalf("Error starting endpoint server: %v", err) | ||
} | ||
defer ss.Stop() | ||
|
||
dopts := []grpc.DialOption{ | ||
grpc.WithInsecure(), | ||
grpc.WithResolvers(r), | ||
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, mdBalancerName)), | ||
} | ||
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer cc.Close() | ||
tc := testpb.NewTestServiceClient(cc) | ||
t.Log("Created a ClientConn...") | ||
|
||
state := resolver.State{Addresses: []resolver.Address{{Addr: ss.address}}} | ||
r.UpdateState(state) | ||
t.Logf("Pushing resolver state update: %v through the manual resolver", state) | ||
|
||
// The RPC should succeed with the expected md. | ||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
defer cancel() | ||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { | ||
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err) | ||
} | ||
t.Log("Made an RPC which succeeded...") | ||
|
||
// The server should receive the test metadata. | ||
md1 := <-testMDChan | ||
if len(md1) == 0 || md1[0] != testMDValue { | ||
t.Fatalf("got md: %v, want %v", md1, []string{testMDValue}) | ||
} | ||
} | ||
|
||
// TestServersSwap creates two servers and verifies the client switches between | ||
// them when the name resolver reports the first and then the second. | ||
func (s) TestServersSwap(t *testing.T) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/MD/Attributes/ (or Attrs)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done