Skip to content

Commit

Permalink
xds: Add RLS in xDS e2e test (#5281)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Apr 4, 2022
1 parent 0066bf6 commit e583b19
Show file tree
Hide file tree
Showing 10 changed files with 355 additions and 155 deletions.
85 changes: 45 additions & 40 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
rlstest "google.golang.org/grpc/internal/testutils/rls"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand All @@ -48,10 +49,10 @@ import (
// and the old one is closed.
func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
// Start two RLS servers.
lis1 := newListenerWrapper(t, nil)
rlsServer1, rlsReqCh1 := setupFakeRLSServer(t, lis1)
lis2 := newListenerWrapper(t, nil)
rlsServer2, rlsReqCh2 := setupFakeRLSServer(t, lis2)
lis1 := testutils.NewListenerWrapper(t, nil)
rlsServer1, rlsReqCh1 := rlstest.SetupFakeRLSServer(t, lis1)
lis2 := testutils.NewListenerWrapper(t, nil)
rlsServer2, rlsReqCh2 := rlstest.SetupFakeRLSServer(t, lis2)

// Build RLS service config with the RLS server pointing to the first one.
// Set a very low value for maxAge to ensure that the entry expires soon.
Expand All @@ -61,12 +62,12 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
// Start a couple of test backends, and set up the fake RLS servers to return
// these as a target in the RLS response.
backendCh1, backendAddress1 := startBackend(t)
rlsServer1.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
rlsServer1.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
})
backendCh2, backendAddress2 := startBackend(t)
rlsServer2.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
rlsServer2.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
})

// Register a manual resolver and push the RLS service config through it.
Expand All @@ -84,11 +85,11 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, backendCh1)

// Ensure a connection is established to the first RLS server.
val, err := lis1.newConnCh.Receive(ctx)
val, err := lis1.NewConnCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}
conn1 := val.(*connWrapper)
conn1 := val.(*testutils.ConnWrapper)

// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh1, true)
Expand All @@ -105,12 +106,12 @@ func (s) TestConfigUpdate_ControlChannel(t *testing.T) {
r.UpdateState(resolver.State{ServiceConfig: sc})

// Ensure a connection is established to the second RLS server.
if _, err := lis2.newConnCh.Receive(ctx); err != nil {
if _, err := lis2.NewConnCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}

// Ensure the connection to the old one is closed.
if _, err := conn1.closeCh.Receive(ctx); err != nil {
if _, err := conn1.CloseCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to close control channel")
}

Expand All @@ -136,8 +137,8 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
}

// Start an RLS server with the wrapped listener and credentials.
lis := newListenerWrapper(t, nil)
rlsServer, rlsReqCh := setupFakeRLSServer(t, lis, grpc.Creds(serverCreds))
lis := testutils.NewListenerWrapper(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis, grpc.Creds(serverCreds))
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Build RLS service config.
Expand All @@ -147,8 +148,8 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
// and set up the fake RLS server to return this as the target in the RLS
// response.
backendCh, backendAddress := startBackend(t, grpc.Creds(serverCreds))
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
Expand All @@ -173,7 +174,7 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, true)

// Ensure a connection is established to the first RLS server.
if _, err := lis.newConnCh.Receive(ctx); err != nil {
if _, err := lis.NewConnCh.Receive(ctx); err != nil {
t.Fatal("Timeout expired when waiting for LB policy to create control channel")
}
}
Expand All @@ -184,7 +185,7 @@ func (s) TestConfigUpdate_ControlChannelWithCreds(t *testing.T) {
// provided service config is applied for the control channel.
func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register a balancer to be used for the control channel, and set up a
Expand All @@ -211,8 +212,8 @@ func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) {
// Start a test backend, and set up the fake RLS server to return this as a
// target in the RLS response.
backendCh, backendAddress := startBackend(t)
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
Expand Down Expand Up @@ -244,7 +245,7 @@ func (s) TestConfigUpdate_ControlChannelServiceConfig(t *testing.T) {
// target after the config has been applied.
func (s) TestConfigUpdate_DefaultTarget(t *testing.T) {
// Start an RLS server and set the throttler to always throttle requests.
rlsServer, _ := setupFakeRLSServer(t, nil)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())

// Build RLS service config with a default target.
Expand Down Expand Up @@ -284,16 +285,16 @@ func (s) TestConfigUpdate_DefaultTarget(t *testing.T) {
// child policy configuration are propagated correctly.
func (s) TestConfigUpdate_ChildPolicyConfigs(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Start a default backend and a test backend.
_, defBackendAddress := startBackend(t)
testBackendCh, testBackendAddress := startBackend(t)

// Set up the RLS server to respond with the test backend.
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
})

// Set up a test balancer callback to push configs received by child policies.
Expand Down Expand Up @@ -411,7 +412,7 @@ func (s) TestConfigUpdate_ChildPolicyConfigs(t *testing.T) {
// handled by closing the old balancer and creating a new one.
func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := setupFakeRLSServer(t, nil)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Set up balancer callbacks.
Expand Down Expand Up @@ -507,14 +508,14 @@ func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) {
// the caller of the RPC.
func (s) TestConfigUpdate_BadChildPolicyConfigs(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Set up the RLS server to respond with a bad target field which is expected
// to cause the child policy's ParseTarget to fail and should result in the LB
// policy creating a lame child policy wrapper.
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{e2e.RLSChildPolicyBadTarget}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{e2e.RLSChildPolicyBadTarget}}}
})

// Build RLS service config with a default target. This default backend is
Expand Down Expand Up @@ -567,7 +568,7 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
defer func() { minEvictDuration = origMinEvictDuration }()

// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register an LB policy to act as the child policy for RLS LB policy.
Expand All @@ -582,14 +583,14 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
// these as targets in the RLS response, based on request keys.
backendCh1, backendAddress1 := startBackend(t)
backendCh2, backendAddress2 := startBackend(t)
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
if req.KeyMap["k1"] == "v1" {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
}
if req.KeyMap["k2"] == "v2" {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
}
return &e2e.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
})

// Register a manual resolver and push the RLS service config through it.
Expand Down Expand Up @@ -661,7 +662,7 @@ func (s) TestDataCachePurging(t *testing.T) {
defer func() { dataCachePurgeHook = origDataCachePurgeHook }()

// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register an LB policy to act as the child policy for RLS LB policy.
Expand All @@ -678,8 +679,8 @@ func (s) TestDataCachePurging(t *testing.T) {
// Start a test backend, and set up the fake RLS server to return this as a
// target in the RLS response.
backendCh, backendAddress := startBackend(t)
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
Expand Down Expand Up @@ -740,7 +741,7 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {

// Start an RLS server with the restartable listener and set the throttler to
// never throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, lis)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Override the reset backoff hook to get notified.
Expand Down Expand Up @@ -769,8 +770,8 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
// Start a test backend, and set up the fake RLS server to return this as a
// target in the RLS response.
backendCh, backendAddress := startBackend(t)
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
Expand Down Expand Up @@ -818,7 +819,11 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh)

<-resetBackoffDone
select {
case <-ctx.Done():
t.Fatalf("Timed out waiting for resetBackoffDone")
case <-resetBackoffDone:
}

// The fact that the above RPC succeeded indicates that the control channel
// has moved back to READY. The connectivity state monitoring code should have
Expand Down
22 changes: 11 additions & 11 deletions balancer/rls/control_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/rls/internal/test/e2e"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
rlstest "google.golang.org/grpc/internal/testutils/rls"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/testdata"
Expand All @@ -47,7 +47,7 @@ import (
// indicates that the control channel needs to be throttled.
func (s) TestControlChannelThrottled(t *testing.T) {
// Start an RLS server and set the throttler to always throttle requests.
rlsServer, rlsReqCh := setupFakeRLSServer(t, nil)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())

// Create a control channel to the fake RLS server.
Expand All @@ -70,12 +70,12 @@ func (s) TestControlChannelThrottled(t *testing.T) {
// TestLookupFailure tests the case where the RLS server responds with an error.
func (s) TestLookupFailure(t *testing.T) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := setupFakeRLSServer(t, nil)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Setup the RLS server to respond with errors.
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
return &e2e.RouteLookupResponse{Err: errors.New("rls failure")}
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Err: errors.New("rls failure")}
})

// Create a control channel to the fake RLS server.
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s) TestLookupDeadlineExceeded(t *testing.T) {
}

// Start an RLS server and set the throttler to never throttle.
rlsServer, _ := setupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Create a control channel with a small deadline.
Expand Down Expand Up @@ -246,7 +246,7 @@ var (
Reason: rlspb.RouteLookupRequest_REASON_MISS,
StaleHeaderData: staleHeaderData,
}
lookupResponse = &e2e.RouteLookupResponse{
lookupResponse = &rlstest.RouteLookupResponse{
Resp: &rlspb.RouteLookupResponse{
Targets: wantTargets,
HeaderData: wantHeaderData,
Expand All @@ -256,11 +256,11 @@ var (

func testControlChannelCredsSuccess(t *testing.T, sopts []grpc.ServerOption, bopts balancer.BuildOptions) {
// Start an RLS server and set the throttler to never throttle requests.
rlsServer, _ := setupFakeRLSServer(t, nil, sopts...)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, sopts...)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Setup the RLS server to respond with a valid response.
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *e2e.RouteLookupResponse {
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return lookupResponse
})

Expand Down Expand Up @@ -356,7 +356,7 @@ func testControlChannelCredsFailure(t *testing.T, sopts []grpc.ServerOption, bop
// Start an RLS server and set the throttler to never throttle requests. The
// creds failures happen before the RPC handler on the server is invoked.
// So, there is need to setup the request and responses on the fake server.
rlsServer, _ := setupFakeRLSServer(t, nil, sopts...)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, sopts...)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Create the control channel to the fake server.
Expand Down Expand Up @@ -454,7 +454,7 @@ func (*unsupportedCredsBundle) NewWithMode(mode string) (credentials.Bundle, err
// TestNewControlChannelUnsupportedCredsBundle tests the case where the control
// channel is configured with a bundle which does not support the mode we use.
func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) {
rlsServer, _ := setupFakeRLSServer(t, nil)
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)

// Create the control channel to the fake server.
ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{CredsBundle: &unsupportedCredsBundle{}}, nil)
Expand Down

0 comments on commit e583b19

Please sign in to comment.