Skip to content

Commit

Permalink
Merge pull request #974 from pohly/node-errors
Browse files Browse the repository at this point in the history
node errors
  • Loading branch information
avalluri committed Jun 7, 2021
2 parents f4c19e5 + 95e3981 commit 9cc771f
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 24 deletions.
6 changes: 3 additions & 3 deletions pkg/grpc-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"fmt"
"sync"

"github.com/intel/pmem-csi/pkg/pmem-grpc"
pmemgrpc "github.com/intel/pmem-csi/pkg/pmem-grpc"
"github.com/kubernetes-csi/csi-lib-utils/metrics"
"google.golang.org/grpc"
"k8s.io/klog/v2"
Expand All @@ -33,11 +33,11 @@ func NewNonBlockingGRPCServer() *NonBlockingGRPCServer {
return &NonBlockingGRPCServer{}
}

func (s *NonBlockingGRPCServer) Start(endpoint string, tlsConfig *tls.Config, csiMetricsManager metrics.CSIMetricsManager, services ...Service) error {
func (s *NonBlockingGRPCServer) Start(endpoint, errorPrefix string, tlsConfig *tls.Config, csiMetricsManager metrics.CSIMetricsManager, services ...Service) error {
if endpoint == "" {
return fmt.Errorf("endpoint cannot be empty")
}
rpcServer, l, err := pmemgrpc.NewServer(endpoint, tlsConfig, csiMetricsManager)
rpcServer, l, err := pmemgrpc.NewServer(endpoint, errorPrefix, tlsConfig, csiMetricsManager)
if err != nil {
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/pmem-csi-driver/controllerserver-node.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (cs *nodeControllerServer) createVolumeInternal(ctx context.Context,
asked := capacity.GetRequiredBytes()
if vol := cs.getVolumeByName(volumeName); vol != nil {
// Check if the size of existing volume can cover the new request
klog.V(4).Infof("Node CreateVolume: volume exists, name:%q id:%s size:%v", volumeName, vol.ID, vol.Size)
klog.V(4).Infof("CreateVolume: volume exists, name:%q id:%s size:%v", volumeName, vol.ID, vol.Size)
if vol.Size < asked {
statusErr = status.Error(codes.AlreadyExists, fmt.Sprintf("smaller volume with the same name %q already exists", volumeName))
return
Expand All @@ -218,8 +218,8 @@ func (cs *nodeControllerServer) createVolumeInternal(ctx context.Context,
return
}

klog.V(4).Infof("Node CreateVolume: Name:%q req.Required:%v req.Limit:%v", volumeName, asked, capacity.GetLimitBytes())
volumeID = generateVolumeID("Node CreateVolume", volumeName)
klog.V(4).Infof("CreateVolume: Name:%q req.Required:%v req.Limit:%v", volumeName, asked, capacity.GetLimitBytes())
volumeID = generateVolumeID("CreateVolume", volumeName)
// Check do we have entry with newly generated VolumeID already
if vol := cs.getVolumeByID(volumeID); vol != nil {
// if we have, that has to be VolumeID collision, because above we checked
Expand All @@ -245,7 +245,7 @@ func (cs *nodeControllerServer) createVolumeInternal(ctx context.Context,
// Writing this state after creating the volume has the risk that
// we leak the volume if we don't get around to storing the state.
if err := cs.sm.Create(volumeID, vol); err != nil {
statusErr = status.Error(codes.Internal, "Node CreateVolume: "+err.Error())
statusErr = status.Error(codes.Internal, "store state: "+err.Error())
return
}
defer func() {
Expand All @@ -269,7 +269,7 @@ func (cs *nodeControllerServer) createVolumeInternal(ctx context.Context,
if errors.Is(err, pmemerr.NotEnoughSpace) {
code = codes.ResourceExhausted
}
statusErr = status.Errorf(code, "Node CreateVolume: device creation failed: %v", err)
statusErr = status.Errorf(code, "device creation failed: %v", err)
return
}
// TODO(?): determine and return actual size here?
Expand All @@ -278,7 +278,7 @@ func (cs *nodeControllerServer) createVolumeInternal(ctx context.Context,
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.pmemVolumes[volumeID] = vol
klog.V(3).Infof("Node CreateVolume: Record new volume as %v", *vol)
klog.V(3).Infof("CreateVolume: Record new volume as %v", *vol)

return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pmem-csi-driver/pmem-csi-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (csid *csiDriver) Run() error {
ns := NewNodeServer(cs, filepath.Clean(csid.cfg.StateBasePath)+"/mount")

services := []grpcserver.Service{ids, ns, cs}
if err := s.Start(csid.cfg.Endpoint, nil, cmm, services...); err != nil {
if err := s.Start(csid.cfg.Endpoint, csid.cfg.NodeID, nil, cmm, services...); err != nil {
return err
}

Expand Down
23 changes: 21 additions & 2 deletions pkg/pmem-grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pmemgrpc

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
Expand All @@ -17,6 +18,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"

pmemcommon "github.com/intel/pmem-csi/pkg/pmem-common"
Expand Down Expand Up @@ -54,8 +56,9 @@ func Connect(endpoint string, tlsConfig *tls.Config, dialOptions ...grpc.DialOpt
return grpc.Dial(address, dialOptions...)
}

//NewServer is a helper function to start a grpc server at given endpoint and uses provided tlsConfig
func NewServer(endpoint string, tlsConfig *tls.Config, csiMetricsManager metrics.CSIMetricsManager, opts ...grpc.ServerOption) (*grpc.Server, net.Listener, error) {
// NewServer is a helper function to start a grpc server at the given endpoint.
// The error prefix is added to all error messages if not empty.
func NewServer(endpoint, errorPrefix string, tlsConfig *tls.Config, csiMetricsManager metrics.CSIMetricsManager, opts ...grpc.ServerOption) (*grpc.Server, net.Listener, error) {
proto, addr, err := parseEndpoint(endpoint)
if err != nil {
return nil, nil, err
Expand All @@ -75,6 +78,22 @@ func NewServer(endpoint string, tlsConfig *tls.Config, csiMetricsManager metrics
interceptors := []grpc.UnaryServerInterceptor{
pmemcommon.LogGRPCServer,
}
if errorPrefix != "" {
interceptors = append(interceptors,
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
if err != nil {
// We loose any additional details here that might be attached
// to the status, but that's okay because we shouldn't have any.
originalStatus := status.Convert(err)
extendedStatus := status.New(originalStatus.Code(),
errorPrefix+": "+originalStatus.Message())
// Return the extended error.
return resp, extendedStatus.Err()
}
return resp, err
})
}
if csiMetricsManager != nil {
interceptors = append(interceptors,
connection.ExtendedCSIMetricsManager{CSIMetricsManager: csiMetricsManager}.RecordMetricsServerInterceptor)
Expand Down
24 changes: 12 additions & 12 deletions pkg/pmem-state/pmem-state.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ func (fs *fileState) Create(id string, data interface{}) error {
// Create new file for synchronous writes
fp, err := os.OpenFile(file, os.O_WRONLY|os.O_SYNC|os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
return fmt.Errorf("file-state: failed to create metadata storage file %q: %w", file, err)
return fmt.Errorf("failed to create state file: %w", err)
}

if err := json.NewEncoder(fp).Encode(data); err != nil {
// cleanup file entry before returning error
fp.Close() //nolint: errcheck, gosec
if e := os.Remove(file); e != nil {
klog.Warningf("file-state: fail to remove file %s: %s", file, e.Error())
klog.Warningf("file-state: failed to remove state file: %v", e)
}
return fmt.Errorf("file-state: failed to encode metadata: %w", err)
return fmt.Errorf("failed to encode metadata: %w", err)
}

if err := fp.Close(); err != nil {
return fmt.Errorf("file-state: failed to close metadata storage file %q: %w", file, err)
return fmt.Errorf("failed to close state file: %w", err)
}

return fs.syncStateDir()
Expand All @@ -91,7 +91,7 @@ func (fs *fileState) Delete(id string) error {

file := path.Join(fs.location, id+".json")
if err := os.Remove(file); err != nil && err != os.ErrNotExist {
return fmt.Errorf("file-state: failed to delete file %q: %w", file, err)
return fmt.Errorf("failed to delete state file: %w", err)
}

return fs.syncStateDir()
Expand All @@ -108,7 +108,7 @@ func (fs *fileState) GetAll() ([]string, error) {
files, err := ioutil.ReadDir(fs.location)
fs.stateDirLock.Unlock()
if err != nil {
return nil, fmt.Errorf("file-state: failed to read metadata from %q: %w", fs.location, err)
return nil, fmt.Errorf("failed to read metadata from %q: %w", fs.location, err)
}

ids := []string{}
Expand All @@ -131,7 +131,7 @@ func ensureLocation(directory string) error {
err = os.Mkdir(directory, 0750)
}
} else if !info.IsDir() {
err = fmt.Errorf("State location(%s) must be a directory", directory)
err = fmt.Errorf("state location %q must be a directory", directory)
}

return err
Expand All @@ -143,12 +143,12 @@ func (fs *fileState) readFileData(file string, dataPtr interface{}) error {

fp, err := os.OpenFile(file, os.O_RDONLY|os.O_SYNC, 0) //nolint: gosec
if err != nil {
return fmt.Errorf("file-state: failed to open file %q: %w", file, err)
return fmt.Errorf("failed to open state file: %w", err)
}
defer fp.Close() //nolint: errcheck

if err := json.NewDecoder(fp).Decode(dataPtr); err != nil {
return fmt.Errorf("file-state: failed to decode metadata from file %q: %w", file, err)
return fmt.Errorf("failed to decode metadata from file %q: %w", file, err)
}

return nil
Expand All @@ -160,12 +160,12 @@ func (fs *fileState) syncStateDir() error {
defer fs.stateDirLock.Unlock()

if fp, err := os.Open(fs.location); err != nil {
rErr = fmt.Errorf("file-state: failed to open state directory for syncing: %w", err)
rErr = fmt.Errorf("failed to open state directory for syncing: %w", err)
} else if err := fp.Sync(); err != nil {
fp.Close() //nolint: errcheck
rErr = fmt.Errorf("file-state: fsync failure on state directory: %w", err)
rErr = fmt.Errorf("fsync failure on state directory: %w", err)
} else if err := fp.Close(); err != nil {
rErr = fmt.Errorf("file-state: failed to close state directory after sync: %w", err)
rErr = fmt.Errorf("failed to close state directory after sync: %w", err)
}

return rErr
Expand Down
11 changes: 11 additions & 0 deletions test/e2e/storage/sanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,17 @@ fi
doall(false)
})
})

It("reports errors properly", func() {
for nodeName, node := range nodes {
_, err := node.cc.DeleteVolume(ctx, &csi.DeleteVolumeRequest{})
Expect(err).ToNot(BeNil(), "DeleteVolume with empty volume ID did not fail on node %s", nodeName)
Expect(err.Error()).To(ContainSubstring(nodeName+": "), "errors should contain node name")
status, ok := status.FromError(err)
Expect(ok).To(BeTrue(), "error type %T should have contained a gRPC status: %v", err, err)
Expect(status.Code().String()).To(Equal(codes.InvalidArgument.String()), "status code should be InvalidArgument")
}
})
})
})
})
Expand Down

0 comments on commit 9cc771f

Please sign in to comment.