diff --git a/xds/csds/csds.go b/xds/csds/csds.go index 73b92e9443c..468580f0b6a 100644 --- a/xds/csds/csds.go +++ b/xds/csds/csds.go @@ -25,7 +25,6 @@ package csds import ( "context" - "fmt" "io" "time" @@ -59,14 +58,23 @@ type xdsClientInterface interface { var ( logger = grpclog.Component("xds") - newXDSClient = func() (xdsClientInterface, error) { - return client.New() + newXDSClient = func() xdsClientInterface { + c, err := client.New() + if err != nil { + // If err is not nil, c is a typed nil (of type *xdsclient.Client). + // If c is returned and assigned to the xdsClient field in the CSDS + // server, the nil checks in the handlers will not handle it + // properly. + logger.Warningf("failed to create xds client: %v", err) + return nil + } + return c } ) // ClientStatusDiscoveryServer implementations interface ClientStatusDiscoveryServiceServer. type ClientStatusDiscoveryServer struct { - // xdsClient will always be the same in practise. But we keep a copy in each + // xdsClient will always be the same in practice. But we keep a copy in each // server instance for testing. xdsClient xdsClientInterface } @@ -74,13 +82,7 @@ type ClientStatusDiscoveryServer struct { // NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be // registered on a gRPC server. func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) { - xdsC, err := newXDSClient() - if err != nil { - return nil, fmt.Errorf("failed to create xds client: %v", err) - } - return &ClientStatusDiscoveryServer{ - xdsClient: xdsC, - }, nil + return &ClientStatusDiscoveryServer{xdsClient: newXDSClient()}, nil } // StreamClientStatus implementations interface ClientStatusDiscoveryServiceServer. @@ -113,6 +115,9 @@ func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req * // // If it returns an error, the error is a status error. func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) { + if s.xdsClient == nil { + return &v3statuspb.ClientStatusResponse{}, nil + } // Field NodeMatchers is unsupported, by design // https://github.com/grpc/proposal/blob/master/A40-csds-support.md#detail-node-matching. if len(req.NodeMatchers) != 0 { @@ -137,7 +142,9 @@ func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statusp // Close cleans up the resources. func (s *ClientStatusDiscoveryServer) Close() { - s.xdsClient.Close() + if s.xdsClient != nil { + s.xdsClient.Close() + } } // nodeProtoToV3 converts the given proto into a v3.Node. n is from bootstrap diff --git a/xds/csds/csds_test.go b/xds/csds/csds_test.go index 6cf88f6d394..a051092a840 100644 --- a/xds/csds/csds_test.go +++ b/xds/csds/csds_test.go @@ -275,9 +275,7 @@ func commonSetup(t *testing.T) (xdsClientInterfaceWithWatch, *e2e.ManagementServ t.Fatalf("failed to create xds client: %v", err) } oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClientInterface, error) { - return xdsC, nil - } + newXDSClient = func() xdsClientInterface { return xdsC } // Initialize an gRPC server and register CSDS on it. server := grpc.NewServer() @@ -635,6 +633,58 @@ func protoToJSON(p proto.Message) string { return ret } +func TestCSDSNoXDSClient(t *testing.T) { + oldNewXDSClient := newXDSClient + newXDSClient = func() xdsClientInterface { return nil } + defer func() { newXDSClient = oldNewXDSClient }() + + // Initialize an gRPC server and register CSDS on it. + server := grpc.NewServer() + csdss, err := NewClientStatusDiscoveryServer() + if err != nil { + t.Fatal(err) + } + defer csdss.Close() + v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss) + // Create a local listener and pass it to Serve(). + lis, err := xtestutils.LocalTCPListener() + if err != nil { + t.Fatalf("xtestutils.LocalTCPListener() failed: %v", err) + } + go func() { + if err := server.Serve(lis); err != nil { + t.Errorf("Serve() failed: %v", err) + } + }() + defer server.Stop() + + // Create CSDS client. + conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) + if err != nil { + t.Fatalf("cannot connect to server: %v", err) + } + defer conn.Close() + c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true)) + if err != nil { + t.Fatalf("cannot get ServerReflectionInfo: %v", err) + } + + if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil { + t.Fatalf("failed to send: %v", err) + } + r, err := stream.Recv() + if err != nil { + // io.EOF is not ok. + t.Fatalf("failed to recv response: %v", err) + } + if n := len(r.Config); n != 0 { + t.Fatalf("got %d configs, want 0: %v", n, proto.MarshalTextString(r)) + } +} + func Test_nodeProtoToV3(t *testing.T) { const ( testID = "test-id"