Skip to content

Commit

Permalink
xds: use separate update channels for listeners in test (#4712)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Sep 3, 2021
1 parent c93e472 commit b2ba77a
Showing 1 changed file with 88 additions and 81 deletions.
169 changes: 88 additions & 81 deletions xds/internal/test/xds_server_serving_mode_test.go
Expand Up @@ -24,64 +24,20 @@ package xds_test

import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"

v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal/testutils"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/xds"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/e2e"
)

// A convenience typed used to keep track of mode changes on multiple listeners.
type modeTracker struct {
mu sync.Mutex
modes map[string]xds.ServingMode
updateCh *testutils.Channel
}

func newModeTracker() *modeTracker {
return &modeTracker{
modes: make(map[string]xds.ServingMode),
updateCh: testutils.NewChannel(),
}
}

func (mt *modeTracker) updateMode(ctx context.Context, addr net.Addr, mode xds.ServingMode) {
mt.mu.Lock()
defer mt.mu.Unlock()

mt.modes[addr.String()] = mode
// Sometimes we could get state updates which are not expected by the test.
// Using `Send()` here would block in that case and cause the whole test to
// hang and will eventually only timeout when the `-timeout` passed to `go
// test` elapses. Using `SendContext()` here instead fails the test within a
// reasonable timeout.
mt.updateCh.SendContext(ctx, nil)
}

func (mt *modeTracker) getMode(addr net.Addr) xds.ServingMode {
mt.mu.Lock()
defer mt.mu.Unlock()
return mt.modes[addr.String()]
}

func (mt *modeTracker) waitForUpdate(ctx context.Context) error {
_, err := mt.updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("error when waiting for a mode change update: %v", err)
}
return nil
}

// TestServerSideXDS_ServingModeChanges tests the serving mode functionality in
// xDS enabled gRPC servers. It verifies that appropriate mode changes happen in
// the server, and also verifies behavior of clientConns under these modes.
Expand All @@ -97,30 +53,41 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
t.Fatal(err)
}

// Create a server option to get notified about serving mode changes.
// Create two local listeners and pass it to Serve().
lis1, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis2, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}

// Create a couple of channels on which mode updates will be pushed.
updateCh1 := make(chan xds.ServingMode, 1)
updateCh2 := make(chan xds.ServingMode, 1)

// Create a server option to get notified about serving mode changes, and
// push the updated mode on the channels created above.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
modeTracker := newModeTracker()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
modeTracker.updateMode(ctx, addr, args.Mode)
switch addr.String() {
case lis1.Addr().String():
updateCh1 <- args.Mode
case lis2.Addr().String():
updateCh2 <- args.Mode
default:
t.Logf("serving mode callback invoked for unknown listener address: %q", addr.String())
}
})

// Initialize an xDS-enabled gRPC server and register the stubServer on it.
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
defer server.Stop()
testpb.RegisterTestServiceServer(server, &testService{})

// Create two local listeners and pass it to Serve().
lis1, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis2, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}

// Setup the management server to respond with server-side Listener
// resources for both listeners.
host1, port1, err := hostPortFromListener(lis1)
Expand Down Expand Up @@ -153,11 +120,21 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
}()

// Wait for both listeners to move to "serving" mode.
if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeServing); err != nil {
t.Fatal(err)
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
}
}
if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeServing); err != nil {
t.Fatal(err)
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
}
}

// Create a ClientConn to the first listener and make a successful RPCs.
Expand All @@ -184,8 +161,25 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
}); err != nil {
t.Error(err)
}
if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeNotServing); err != nil {
t.Fatal(err)

// Wait for lis2 to move to "not-serving" mode. lis1 also receives an update
// here even though it stays in "serving" mode.
// See https://github.com/grpc/grpc-go/issues/4695.
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing)
}
}

// Make sure RPCs succeed on cc1 and fail on cc2.
Expand All @@ -201,8 +195,17 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
}); err != nil {
t.Error(err)
}
if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeNotServing); err != nil {
t.Fatal(err)

// Wait for lis1 to move to "not-serving" mode. lis2 was already removed
// from the xdsclient's resource cache. So, lis2's callback will not be
// invoked this time around.
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing)
}
}

// Make sure RPCs fail on both.
Expand All @@ -226,29 +229,28 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
}

// Wait for both listeners to move to "serving" mode.
if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeServing); err != nil {
t.Fatal(err)
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
}
}
if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeServing); err != nil {
t.Fatal(err)
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
}
}

// The clientConns created earlier should be able to make RPCs now.
waitForSuccessfulRPC(ctx, t, cc1)
waitForSuccessfulRPC(ctx, t, cc2)
}

func waitForModeChange(ctx context.Context, modeTracker *modeTracker, addr net.Addr, wantMode xds.ServingMode) error {
for {
if gotMode := modeTracker.getMode(addr); gotMode == wantMode {
return nil
}
if err := modeTracker.waitForUpdate(ctx); err != nil {
return err
}
}
}

func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
t.Helper()

Expand All @@ -261,8 +263,13 @@ func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn
func waitForFailedRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
t.Helper()

// Attempt one RPC before waiting for the ticker to expire.
c := testpb.NewTestServiceClient(cc)
ticker := time.NewTimer(10 * time.Millisecond)
if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
return
}

ticker := time.NewTimer(1 * time.Second)
defer ticker.Stop()
for {
select {
Expand Down

0 comments on commit b2ba77a

Please sign in to comment.