Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Add RLS in xDS e2e test #5281

Merged
merged 5 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
79 changes: 40 additions & 39 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
rlstest "google.golang.org/grpc/internal/testutils/rls"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand All @@ -48,10 +49,10 @@ import (
// and the old one is closed.
func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
// Start two RLS servers.
lis1 := newListenerWrapper(t, nil)
rlsServer1, rlsReqCh1 := setupFakeRLSServer(t, lis1)
lis2 := newListenerWrapper(t, nil)
rlsServer2, rlsReqCh2 := setupFakeRLSServer(t, lis2)
lis1 := testutils.NewListenerWrapper(t, nil)
rlsServer1, rlsReqCh1 := rlstest.SetupFakeRLSServer(t, lis1)
lis2 := testutils.NewListenerWrapper(t, nil)
rlsServer2, rlsReqCh2 := rlstest.SetupFakeRLSServer(t, lis2)

// Build RLS service config with the RLS server pointing to the first one.
// Set a very low value for maxAge to ensure that the entry expires soon.
Expand All @@ -61,12 +62,12 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
// Start a couple of test backends, and set up the fake RLS servers to return
// these as a target in the RLS response.
backendCh1, backendAddress1 := startBackend(t)
rlsServer1.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
rlsServer1.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
})
backendCh2, backendAddress2 := startBackend(t)
rlsServer2.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
rlsServer2.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
})

// Register a manual resolver and push the RLS service config through it.
Expand All @@ -84,11 +85,11 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, backendCh1)

// Ensure a connection is established to the first RLS server.
val, err := lis1.newConnCh.Receive(ctx)
val, err := lis1.NewConnCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}
conn1 := val.(*connWrapper)
conn1 := val.(*testutils.ConnWrapper)

// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh1, true)
Expand All @@ -105,12 +106,12 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
r.UpdateState(resolver.State{ServiceConfig: sc})

// Ensure a connection is established to the second RLS server.
if _, err := lis2.newConnCh.Receive(ctx); err != nil {
if _, err := lis2.NewConnCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}

// Ensure the connection to the old one is closed.
if _, err := conn1.closeCh.Receive(ctx); err != nil {
if _, err := conn1.CloseCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to close control channel")
}

Expand All @@ -136,8 +137,8 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
}

// Start an RLS server with the wrapped listener and credentials.
lis := newListenerWrapper(t, nil)
rlsServer, rlsReqCh := setupFakeRLSServer(t, lis, grpc.Creds(serverCreds))
lis := testutils.NewListenerWrapper(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis, grpc.Creds(serverCreds))
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Build RLS service config.
Expand All @@ -147,8 +148,8 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
// and set up the fake RLS server to return this as the target in the RLS
// response.
backendCh, backendAddress := startBackend(t, grpc.Creds(serverCreds))
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
Expand All @@ -173,7 +174,7 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, true)

// Ensure a connection is established to the first RLS server.
if _, err := lis.newConnCh.Receive(ctx); err != nil {
if _, err := lis.NewConnCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}
}
Expand All @@ -184,7 +185,7 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
// provided service config is applied for the control channel.
func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register a balancer to be used for the control channel, and set up a
Expand All @@ -211,8 +212,8 @@ func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) {
// Start a test backend, and set up the fake RLS server to return this as a
// target in the RLS response.
backendCh, backendAddress := startBackend(t)
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
Expand Down Expand Up @@ -244,7 +245,7 @@ func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) {
// target after the config has been applied.
func (s) TestConfigUpdate_DefaultTarget(t *testing.T) {
// Start an RLS server and set the throttler to always throttle requests.
rlsServer, _ := setupFakeRLSServer(t, nil)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())

// Build RLS service config with a default target.
Expand Down Expand Up @@ -284,16 +285,16 @@ func (s) TestConfigUpdate_DefaultTarget(t *testing.T) {
// child policy configuration are propagated correctly.
func (s) TestConfigUpdate_ChildPolicyConfigs(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Start a default backend and a test backend.
_, defBackendAddress := startBackend(t)
testBackendCh, testBackendAddress := startBackend(t)

// Set up the RLS server to respond with the test backend.
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
})

// Set up a test balancer callback to push configs received by child policies.
Expand Down Expand Up @@ -411,7 +412,7 @@ func (s) TestConfigUpdate_ChildPolicyConfigs(t *testing.T) {
// handled by closing the old balancer and creating a new one.
func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := setupFakeRLSServer(t, nil)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Set up balancer callbacks.
Expand Down Expand Up @@ -507,14 +508,14 @@ func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) {
// the caller of the RPC.
func (s) TestConfigUpdate_BadChildPolicyConfigs(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Set up the RLS server to respond with a bad target field which is expected
// to cause the child policy's ParseTarget to fail and should result in the LB
// policy creating a lame child policy wrapper.
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{e2e.RLSChildPolicyBadTarget}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{e2e.RLSChildPolicyBadTarget}}}
})

// Build RLS service config with a default target. This default backend is
Expand Down Expand Up @@ -567,7 +568,7 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
defer func() { minEvictDuration = origMinEvictDuration }()

// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register an LB policy to act as the child policy for RLS LB policy.
Expand All @@ -582,14 +583,14 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
// these as targets in the RLS response, based on request keys.
backendCh1, backendAddress1 := startBackend(t)
backendCh2, backendAddress2 := startBackend(t)
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
if req.KeyMap["k1"] == "v1" {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
}
if req.KeyMap["k2"] == "v2" {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
}
return &e2e.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
})

// Register a manual resolver and push the RLS service config through it.
Expand Down Expand Up @@ -661,7 +662,7 @@ func (s) TestDataCachePurging(t *testing.T) {
defer func() { dataCachePurgeHook = origDataCachePurgeHook }()

// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register an LB policy to act as the child policy for RLS LB policy.
Expand All @@ -678,8 +679,8 @@ func (s) TestDataCachePurging(t *testing.T) {
// Start a test backend, and set up the fake RLS server to return this as a
// target in the RLS response.
backendCh, backendAddress := startBackend(t)
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
Expand Down Expand Up @@ -740,7 +741,7 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {

// Start an RLS server with the restartable listener and set the throttler to
// never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, lis)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Override the reset backoff hook to get notified.
Expand Down Expand Up @@ -769,8 +770,8 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
// Start a test backend, and set up the fake RLS server to return this as a
// target in the RLS response.
backendCh, backendAddress := startBackend(t)
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
Expand Down
22 changes: 11 additions & 11 deletions balancer/rls/control_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/test/e2e"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
rlstest "google.golang.org/grpc/internal/testutils/rls"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/testdata"
Expand All @@ -47,7 +47,7 @@ import (
// indicates that the control channel needs to be throttled.
func (s) TestControlChannelThrottled(t *testing.T) {
// Start an RLS server and set the throttler to always throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())

// Create a control channel to the fake RLS server.
Expand All @@ -70,12 +70,12 @@ func (s) TestControlChannelThrottled(t *testing.T) {
// TestLookupFailure tests the case where the RLS server responds with an error.
func (s) TestLookupFailure(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := setupFakeRLSServer(t, nil)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Setup the RLS server to respond with errors.
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Err: errors.New("rls failure")}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Err: errors.New("rls failure")}
})

// Create a control channel to the fake RLS server.
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s) TestLookupDeadlineExceeded(t *testing.T) {
}

// Start an RLS server and set the throttler to never throttle.
rlsServer, _ := setupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Create a control channel with a small deadline.
Expand Down Expand Up @@ -246,7 +246,7 @@ var (
Reason: rlspb.RouteLookupRequest_REASON_MISS,
StaleHeaderData: staleHeaderData,
}
lookupResponse = &e2e.RouteLookupResponse{
lookupResponse = &rlstest.RouteLookupResponse{
Resp: &rlspb.RouteLookupResponse{
Targets: wantTargets,
HeaderData: wantHeaderData,
Expand All @@ -256,11 +256,11 @@ var (

func testControlChannelCredsSuccess(t *testing.T, sopts []grpc.ServerOption, bopts balancer.BuildOptions) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := setupFakeRLSServer(t, nil, sopts...)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, sopts...)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Setup the RLS server to respond with a valid response.
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return lookupResponse
})

Expand Down Expand Up @@ -356,7 +356,7 @@ func testControlChannelCredsFailure(t *testing.T, sopts []grpc.ServerOption, bop
// Start an RLS server and set the throttler to never throttle requests. The
// creds failures happen before the RPC handler on the server is invoked.
// So, there is need to setup the request and responses on the fake server.
rlsServer, _ := setupFakeRLSServer(t, nil, sopts...)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, sopts...)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Create the control channel to the fake server.
Expand Down Expand Up @@ -454,7 +454,7 @@ func (*unsupportedCredsBundle) NewWithMode(mode string) (credentials.Bundle, err
// TestNewControlChannelUnsupportedCredsBundle tests the case where the control
// channel is configured with a bundle which does not support the mode we use.
func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) {
rlsServer, _ := setupFakeRLSServer(t, nil)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)

// Create the control channel to the fake server.
ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{CredsBundle: &unsupportedCredsBundle{}}, nil)
Expand Down