From 232759f73ff61d6629fa189287392980e6146c9c Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 8 Oct 2020 12:20:48 -0700 Subject: [PATCH] [lrs_stream_report] test for lrs client --- xds/internal/client/client.go | 1 + xds/internal/client/client_loadreport.go | 3 - xds/internal/client/client_loadreport_test.go | 128 ++++++++++++++++++ xds/internal/testutils/fakeserver/server.go | 4 +- 4 files changed, 131 insertions(+), 5 deletions(-) create mode 100644 xds/internal/client/client_loadreport_test.go diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index a2a783988326..c81ed69c79cd 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -354,6 +354,7 @@ func New(opts Options) (*Client, error) { cdsCache: make(map[string]ClusterUpdate), edsWatchers: make(map[string]map[*watchInfo]bool), edsCache: make(map[string]EndpointsUpdate), + lrsClients: make(map[string]*lrsClient), } cc, err := grpc.Dial(opts.Config.BalancerName, dopts...) diff --git a/xds/internal/client/client_loadreport.go b/xds/internal/client/client_loadreport.go index 8c8cf2664f9f..9313fa01ad77 100644 --- a/xds/internal/client/client_loadreport.go +++ b/xds/internal/client/client_loadreport.go @@ -72,9 +72,6 @@ type lrsClient struct { } // newLRSClient creates a new LRS stream to the server. -// -// If the server is different from the xDS server in the parent xDS client, a -// new ClientConn will be created. func newLRSClient(parent *Client, server string) *lrsClient { return &lrsClient{ parent: parent, diff --git a/xds/internal/client/client_loadreport_test.go b/xds/internal/client/client_loadreport_test.go new file mode 100644 index 000000000000..e39000ca9d8d --- /dev/null +++ b/xds/internal/client/client_loadreport_test.go @@ -0,0 +1,128 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package client_test + +import ( + "context" + "testing" + "time" + + v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal/client" + "google.golang.org/grpc/xds/internal/client/bootstrap" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" + "google.golang.org/grpc/xds/internal/version" + + _ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client. +) + +const ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func (s) TestLRSClient(t *testing.T) { + fs, sCleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("failed to start fake xDS server: %v", err) + } + defer sCleanup() + + xdsC, err := client.New(client.Options{ + Config: bootstrap.Config{ + BalancerName: fs.Address, + Creds: grpc.WithInsecure(), + NodeProto: &v2corepb.Node{}, + TransportAPI: version.TransportV2, + }, + }) + if err != nil { + t.Fatalf("failed to create xds client: %v", err) + } + defer xdsC.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if u, err := fs.NewConnChan.Receive(ctx); err != nil { + t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) + } + + // Report to the same address should not create new ClientConn. + store1, lrsCancel1 := xdsC.ReportLoad(fs.Address) + defer lrsCancel1() + ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if u, err := fs.NewConnChan.Receive(ctx); err != context.DeadlineExceeded { + t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err) + } + + fs2, sCleanup2, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("failed to start fake xDS server: %v", err) + } + defer sCleanup2() + + // Report to a different address should create new ClientConn. + store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address) + defer lrsCancel2() + ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if u, err := fs2.NewConnChan.Receive(ctx); err != nil { + t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) + } + + if store1 == store2 { + t.Fatalf("got same store for different servers, want different") + } + + ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil { + t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) + } + + // Send one resp to the client. + fs2.LRSResponseChan <- &fakeserver.Response{ + Resp: &lrspb.LoadStatsResponse{SendAllClusters: true}, + } + // Wait for the send to finish, so server will wait for the next request. + time.Sleep(defaultTestShortTimeout) + + // Cancel this load reporting stream, server should see error canceled. + lrsCancel2() + + // Server should receive a stream canceled error. + ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil || status.Code(u.(*fakeserver.Request).Err) != codes.Canceled { + t.Errorf("unexpected LRS request: %v, %v, want error canceled", u, err) + } +} diff --git a/xds/internal/testutils/fakeserver/server.go b/xds/internal/testutils/fakeserver/server.go index 4cff72087cec..994f5308f3af 100644 --- a/xds/internal/testutils/fakeserver/server.go +++ b/xds/internal/testutils/fakeserver/server.go @@ -203,10 +203,10 @@ type lrsServer struct { func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoadStatsServer) error { req, err := s.Recv() + lrsS.reqChan.Send(&Request{req, err}) if err != nil { return err } - lrsS.reqChan.Send(&Request{req, err}) select { case r := <-lrsS.respChan: @@ -222,12 +222,12 @@ func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoad for { req, err := s.Recv() + lrsS.reqChan.Send(&Request{req, err}) if err != nil { if err == io.EOF { return nil } return err } - lrsS.reqChan.Send(&Request{req, err}) } }