From 298349c91341480a43426fa75d03482f7f8895ac Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 1 Jun 2021 15:53:23 -0700 Subject: [PATCH 1/2] [csds_no_error] csds: return empty response if xds client is not set --- xds/csds/csds.go | 17 +++++++---- xds/csds/csds_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 6 deletions(-) diff --git a/xds/csds/csds.go b/xds/csds/csds.go index 73b92e9443c..2c8b951eaf8 100644 --- a/xds/csds/csds.go +++ b/xds/csds/csds.go @@ -25,7 +25,6 @@ package csds import ( "context" - "fmt" "io" "time" @@ -76,11 +75,12 @@ type ClientStatusDiscoveryServer struct { func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) { xdsC, err := newXDSClient() if err != nil { - return nil, fmt.Errorf("failed to create xds client: %v", err) + logger.Warningf("failed to create xds client: %v", err) + // Return an explicit nil here, otherwise the nil returned from + // client.New() is a typed nil (type *Client, not the interface). + return &ClientStatusDiscoveryServer{}, nil } - return &ClientStatusDiscoveryServer{ - xdsClient: xdsC, - }, nil + return &ClientStatusDiscoveryServer{xdsClient: xdsC}, nil } // StreamClientStatus implementations interface ClientStatusDiscoveryServiceServer. @@ -113,6 +113,9 @@ func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req * // // If it returns an error, the error is a status error. func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) { + if s.xdsClient == nil { + return &v3statuspb.ClientStatusResponse{}, nil + } // Field NodeMatchers is unsupported, by design // https://github.com/grpc/proposal/blob/master/A40-csds-support.md#detail-node-matching. if len(req.NodeMatchers) != 0 { @@ -137,7 +140,9 @@ func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statusp // Close cleans up the resources. func (s *ClientStatusDiscoveryServer) Close() { - s.xdsClient.Close() + if s.xdsClient != nil { + s.xdsClient.Close() + } } // nodeProtoToV3 converts the given proto into a v3.Node. n is from bootstrap diff --git a/xds/csds/csds_test.go b/xds/csds/csds_test.go index 6cf88f6d394..f5a049638b9 100644 --- a/xds/csds/csds_test.go +++ b/xds/csds/csds_test.go @@ -37,6 +37,7 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/xds" "google.golang.org/grpc/xds/internal/client" + "google.golang.org/grpc/xds/internal/client/bootstrap" _ "google.golang.org/grpc/xds/internal/httpfilter/router" xtestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/e2e" @@ -635,6 +636,73 @@ func protoToJSON(p proto.Message) string { return ret } +type errorXDSClient struct{} + +func (t errorXDSClient) DumpLDS() (string, map[string]client.UpdateWithMD) { panic("implement me") } +func (t errorXDSClient) DumpRDS() (string, map[string]client.UpdateWithMD) { panic("implement me") } +func (t errorXDSClient) DumpCDS() (string, map[string]client.UpdateWithMD) { panic("implement me") } +func (t errorXDSClient) DumpEDS() (string, map[string]client.UpdateWithMD) { panic("implement me") } +func (t errorXDSClient) BootstrapConfig() *bootstrap.Config { panic("implement me") } +func (t errorXDSClient) Close() { panic("implement me") } + +func newErrorXDSClient() (*errorXDSClient, error) { + return nil, fmt.Errorf("typed nil") +} + +func TestCSDSNoXDSClient(t *testing.T) { + oldNewXDSClient := newXDSClient + newXDSClient = func() (xdsClientInterface, error) { + return newErrorXDSClient() + } + defer func() { newXDSClient = oldNewXDSClient }() + + // Initialize an gRPC server and register CSDS on it. + server := grpc.NewServer() + csdss, err := NewClientStatusDiscoveryServer() + if err != nil { + t.Fatal(err) + } + defer csdss.Close() + v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss) + // Create a local listener and pass it to Serve(). + lis, err := xtestutils.LocalTCPListener() + if err != nil { + t.Fatalf("xtestutils.LocalTCPListener() failed: %v", err) + } + go func() { + if err := server.Serve(lis); err != nil { + t.Errorf("Serve() failed: %v", err) + } + }() + defer server.Stop() + + // Create CSDS client. + conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) + if err != nil { + t.Fatalf("cannot connect to server: %v", err) + } + defer conn.Close() + c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true)) + if err != nil { + t.Fatalf("cannot get ServerReflectionInfo: %v", err) + } + + if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil { + t.Fatalf("failed to send: %v", err) + } + r, err := stream.Recv() + if err != nil { + // io.EOF is not ok. + t.Fatalf("failed to recv response: %v", err) + } + if n := len(r.Config); n != 0 { + t.Fatalf("got %d configs, want 0: %v", n, proto.MarshalTextString(r)) + } +} + func Test_nodeProtoToV3(t *testing.T) { const ( testID = "test-id" From 9b021b9db49b24b889d99e28d28678d882d709e9 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 2 Jun 2021 10:21:40 -0700 Subject: [PATCH 2/2] [csds_no_error] c1 --- xds/csds/csds.go | 24 +++++++++++++----------- xds/csds/csds_test.go | 22 ++-------------------- 2 files changed, 15 insertions(+), 31 deletions(-) diff --git a/xds/csds/csds.go b/xds/csds/csds.go index 2c8b951eaf8..468580f0b6a 100644 --- a/xds/csds/csds.go +++ b/xds/csds/csds.go @@ -58,14 +58,23 @@ type xdsClientInterface interface { var ( logger = grpclog.Component("xds") - newXDSClient = func() (xdsClientInterface, error) { - return client.New() + newXDSClient = func() xdsClientInterface { + c, err := client.New() + if err != nil { + // If err is not nil, c is a typed nil (of type *xdsclient.Client). + // If c is returned and assigned to the xdsClient field in the CSDS + // server, the nil checks in the handlers will not handle it + // properly. + logger.Warningf("failed to create xds client: %v", err) + return nil + } + return c } ) // ClientStatusDiscoveryServer implementations interface ClientStatusDiscoveryServiceServer. type ClientStatusDiscoveryServer struct { - // xdsClient will always be the same in practise. But we keep a copy in each + // xdsClient will always be the same in practice. But we keep a copy in each // server instance for testing. xdsClient xdsClientInterface } @@ -73,14 +82,7 @@ type ClientStatusDiscoveryServer struct { // NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be // registered on a gRPC server. func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) { - xdsC, err := newXDSClient() - if err != nil { - logger.Warningf("failed to create xds client: %v", err) - // Return an explicit nil here, otherwise the nil returned from - // client.New() is a typed nil (type *Client, not the interface). - return &ClientStatusDiscoveryServer{}, nil - } - return &ClientStatusDiscoveryServer{xdsClient: xdsC}, nil + return &ClientStatusDiscoveryServer{xdsClient: newXDSClient()}, nil } // StreamClientStatus implementations interface ClientStatusDiscoveryServiceServer. diff --git a/xds/csds/csds_test.go b/xds/csds/csds_test.go index f5a049638b9..a051092a840 100644 --- a/xds/csds/csds_test.go +++ b/xds/csds/csds_test.go @@ -37,7 +37,6 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/xds" "google.golang.org/grpc/xds/internal/client" - "google.golang.org/grpc/xds/internal/client/bootstrap" _ "google.golang.org/grpc/xds/internal/httpfilter/router" xtestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/e2e" @@ -276,9 +275,7 @@ func commonSetup(t *testing.T) (xdsClientInterfaceWithWatch, *e2e.ManagementServ t.Fatalf("failed to create xds client: %v", err) } oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClientInterface, error) { - return xdsC, nil - } + newXDSClient = func() xdsClientInterface { return xdsC } // Initialize an gRPC server and register CSDS on it. server := grpc.NewServer() @@ -636,24 +633,9 @@ func protoToJSON(p proto.Message) string { return ret } -type errorXDSClient struct{} - -func (t errorXDSClient) DumpLDS() (string, map[string]client.UpdateWithMD) { panic("implement me") } -func (t errorXDSClient) DumpRDS() (string, map[string]client.UpdateWithMD) { panic("implement me") } -func (t errorXDSClient) DumpCDS() (string, map[string]client.UpdateWithMD) { panic("implement me") } -func (t errorXDSClient) DumpEDS() (string, map[string]client.UpdateWithMD) { panic("implement me") } -func (t errorXDSClient) BootstrapConfig() *bootstrap.Config { panic("implement me") } -func (t errorXDSClient) Close() { panic("implement me") } - -func newErrorXDSClient() (*errorXDSClient, error) { - return nil, fmt.Errorf("typed nil") -} - func TestCSDSNoXDSClient(t *testing.T) { oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClientInterface, error) { - return newErrorXDSClient() - } + newXDSClient = func() xdsClientInterface { return nil } defer func() { newXDSClient = oldNewXDSClient }() // Initialize an gRPC server and register CSDS on it.