diff --git a/xds/internal/test/xds_server_serving_mode_test.go b/xds/internal/test/xds_server_serving_mode_test.go index 2178cf359bd..ee353ca74c0 100644 --- a/xds/internal/test/xds_server_serving_mode_test.go +++ b/xds/internal/test/xds_server_serving_mode_test.go @@ -24,9 +24,7 @@ package xds_test import ( "context" - "fmt" "net" - "sync" "testing" "time" @@ -34,54 +32,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" xdscreds "google.golang.org/grpc/credentials/xds" - "google.golang.org/grpc/internal/testutils" testpb "google.golang.org/grpc/test/grpc_testing" "google.golang.org/grpc/xds" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/e2e" ) -// A convenience typed used to keep track of mode changes on multiple listeners. -type modeTracker struct { - mu sync.Mutex - modes map[string]xds.ServingMode - updateCh *testutils.Channel -} - -func newModeTracker() *modeTracker { - return &modeTracker{ - modes: make(map[string]xds.ServingMode), - updateCh: testutils.NewChannel(), - } -} - -func (mt *modeTracker) updateMode(ctx context.Context, addr net.Addr, mode xds.ServingMode) { - mt.mu.Lock() - defer mt.mu.Unlock() - - mt.modes[addr.String()] = mode - // Sometimes we could get state updates which are not expected by the test. - // Using `Send()` here would block in that case and cause the whole test to - // hang and will eventually only timeout when the `-timeout` passed to `go - // test` elapses. Using `SendContext()` here instead fails the test within a - // reasonable timeout. - mt.updateCh.SendContext(ctx, nil) -} - -func (mt *modeTracker) getMode(addr net.Addr) xds.ServingMode { - mt.mu.Lock() - defer mt.mu.Unlock() - return mt.modes[addr.String()] -} - -func (mt *modeTracker) waitForUpdate(ctx context.Context) error { - _, err := mt.updateCh.Receive(ctx) - if err != nil { - return fmt.Errorf("error when waiting for a mode change update: %v", err) - } - return nil -} - // TestServerSideXDS_ServingModeChanges tests the serving mode functionality in // xDS enabled gRPC servers. It verifies that appropriate mode changes happen in // the server, and also verifies behavior of clientConns under these modes. @@ -97,13 +53,34 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { t.Fatal(err) } - // Create a server option to get notified about serving mode changes. + // Create two local listeners and pass it to Serve(). + lis1, err := xdstestutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + lis2, err := xdstestutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + + // Create a couple of channels on which mode updates will be pushed. + updateCh1 := make(chan xds.ServingMode, 1) + updateCh2 := make(chan xds.ServingMode, 1) + + // Create a server option to get notified about serving mode changes, and + // push the updated mode on the channels created above. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - modeTracker := newModeTracker() modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) - modeTracker.updateMode(ctx, addr, args.Mode) + switch addr.String() { + case lis1.Addr().String(): + updateCh1 <- args.Mode + case lis2.Addr().String(): + updateCh2 <- args.Mode + default: + t.Logf("serving mode callback invoked for unknown listener address: %q", addr.String()) + } }) // Initialize an xDS-enabled gRPC server and register the stubServer on it. @@ -111,16 +88,6 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { defer server.Stop() testpb.RegisterTestServiceServer(server, &testService{}) - // Create two local listeners and pass it to Serve(). - lis1, err := xdstestutils.LocalTCPListener() - if err != nil { - t.Fatalf("testutils.LocalTCPListener() failed: %v", err) - } - lis2, err := xdstestutils.LocalTCPListener() - if err != nil { - t.Fatalf("testutils.LocalTCPListener() failed: %v", err) - } - // Setup the management server to respond with server-side Listener // resources for both listeners. host1, port1, err := hostPortFromListener(lis1) @@ -153,11 +120,21 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { }() // Wait for both listeners to move to "serving" mode. - if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeServing); err != nil { - t.Fatal(err) + select { + case <-ctx.Done(): + t.Fatalf("timed out waiting for a mode change update: %v", err) + case mode := <-updateCh1: + if mode != xds.ServingModeServing { + t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing) + } } - if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeServing); err != nil { - t.Fatal(err) + select { + case <-ctx.Done(): + t.Fatalf("timed out waiting for a mode change update: %v", err) + case mode := <-updateCh2: + if mode != xds.ServingModeServing { + t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing) + } } // Create a ClientConn to the first listener and make a successful RPCs. @@ -184,8 +161,25 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { }); err != nil { t.Error(err) } - if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeNotServing); err != nil { - t.Fatal(err) + + // Wait for lis2 to move to "not-serving" mode. lis1 also receives an update + // here even though it stays in "serving" mode. + // See https://github.com/grpc/grpc-go/issues/4695. + select { + case <-ctx.Done(): + t.Fatalf("timed out waiting for a mode change update: %v", err) + case mode := <-updateCh1: + if mode != xds.ServingModeServing { + t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing) + } + } + select { + case <-ctx.Done(): + t.Fatalf("timed out waiting for a mode change update: %v", err) + case mode := <-updateCh2: + if mode != xds.ServingModeNotServing { + t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing) + } } // Make sure RPCs succeed on cc1 and fail on cc2. @@ -201,8 +195,17 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { }); err != nil { t.Error(err) } - if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeNotServing); err != nil { - t.Fatal(err) + + // Wait for lis1 to move to "not-serving" mode. lis2 was already removed + // from the xdsclient's resource cache. So, lis2's callback will not be + // invoked this time around. + select { + case <-ctx.Done(): + t.Fatalf("timed out waiting for a mode change update: %v", err) + case mode := <-updateCh1: + if mode != xds.ServingModeNotServing { + t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing) + } } // Make sure RPCs fail on both. @@ -226,11 +229,21 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { } // Wait for both listeners to move to "serving" mode. - if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeServing); err != nil { - t.Fatal(err) + select { + case <-ctx.Done(): + t.Fatalf("timed out waiting for a mode change update: %v", err) + case mode := <-updateCh1: + if mode != xds.ServingModeServing { + t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing) + } } - if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeServing); err != nil { - t.Fatal(err) + select { + case <-ctx.Done(): + t.Fatalf("timed out waiting for a mode change update: %v", err) + case mode := <-updateCh2: + if mode != xds.ServingModeServing { + t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing) + } } // The clientConns created earlier should be able to make RPCs now. @@ -238,17 +251,6 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { waitForSuccessfulRPC(ctx, t, cc2) } -func waitForModeChange(ctx context.Context, modeTracker *modeTracker, addr net.Addr, wantMode xds.ServingMode) error { - for { - if gotMode := modeTracker.getMode(addr); gotMode == wantMode { - return nil - } - if err := modeTracker.waitForUpdate(ctx); err != nil { - return err - } - } -} - func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) { t.Helper() @@ -261,8 +263,13 @@ func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn func waitForFailedRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) { t.Helper() + // Attempt one RPC before waiting for the ticker to expire. c := testpb.NewTestServiceClient(cc) - ticker := time.NewTimer(10 * time.Millisecond) + if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil { + return + } + + ticker := time.NewTimer(1 * time.Second) defer ticker.Stop() for { select {