Skip to content

Commit

Permalink
CSI: include volume namespace in staging path
Browse files Browse the repository at this point in the history
CSI volumes are are namespaced. But the client does not include the namespace in
the staging mount path. This causes CSI volumes with the same volume ID but
different namespace to collide if they happen to be placed on the same host.

Fixes: #18741
  • Loading branch information
tgross committed May 3, 2024
1 parent 54fc146 commit ea3f559
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 49 deletions.
4 changes: 3 additions & 1 deletion client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ func (c *csiHook) claimVolumes(results map[string]*volumePublishResult) error {

// populate data we'll write later to disk
result.stub.VolumeID = resp.Volume.ID
result.stub.VolumeNamespace = resp.Volume.Namespace
result.stub.VolumeExternalID = resp.Volume.RemoteID()
result.stub.PluginID = resp.Volume.PluginID
result.publishContext = resp.PublishContext
Expand Down Expand Up @@ -532,7 +533,8 @@ func (c *csiHook) unmountImpl(result *volumePublishResult) error {
}

return manager.UnmountVolume(c.shutdownCtx,
result.stub.VolumeID, result.stub.VolumeExternalID, c.alloc.ID, usageOpts)
result.stub.VolumeNamespace, result.stub.VolumeID,
result.stub.VolumeExternalID, c.alloc.ID, usageOpts)
}

// Shutdown will get called when the client is gracefully
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type AllocVolumes struct {
// relevant data that we need to persist about the volume.
type CSIVolumeStub struct {
VolumeID string
VolumeNamespace string
VolumeExternalID string
PluginID string
ExternalNodeID string
Expand Down
4 changes: 2 additions & 2 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
AccessMode: req.AccessMode,
}

err = manager.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
err = manager.UnmountVolume(ctx, req.VolumeNamespace, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the unmounting previously happened but the server failed to
// checkpoint, we'll get an error from Unmount but can safely
Expand Down Expand Up @@ -565,7 +565,7 @@ func (c *CSI) NodeExpandVolume(req *structs.ClientCSINodeExpandVolumeRequest, re
return err
}

newCapacity, err := manager.ExpandVolume(ctx,
newCapacity, err := manager.ExpandVolume(ctx, req.VolumeNamespace,
req.VolumeID, req.ExternalID, req.Claim.AllocationID, usageOpts, req.Capacity)

if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
Expand Down
4 changes: 2 additions & 2 deletions client/pluginmanager/csimanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func (u *UsageOptions) ToFS() string {

type VolumeManager interface {
MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error)
UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error
UnmountVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions) error
HasMount(ctx context.Context, mountInfo *MountInfo) (bool, error)
ExpandVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error)
ExpandVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error)
ExternalID() string
}

Expand Down
20 changes: 10 additions & 10 deletions client/pluginmanager/csimanager/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type MockVolumeManager struct {
LastExpandVolumeCall *MockExpandVolumeCall
}

func (m *MockVolumeManager) mountName(volID, allocID string, usageOpts *UsageOptions) string {
return filepath.Join("test-alloc-dir", allocID, volID, usageOpts.ToFS())
func (m *MockVolumeManager) mountName(volNS, volID, allocID string, usageOpts *UsageOptions) string {
return filepath.Join("test-alloc-dir", allocID, volNS, volID, usageOpts.ToFS())
}

func (m *MockVolumeManager) MountVolume(_ context.Context, vol *nstructs.CSIVolume, alloc *nstructs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error) {
Expand All @@ -74,15 +74,15 @@ func (m *MockVolumeManager) MountVolume(_ context.Context, vol *nstructs.CSIVolu
if m.Mounts == nil {
m.Mounts = make(map[string]bool)
}
source := m.mountName(vol.ID, alloc.ID, usageOpts)
source := m.mountName(vol.Namespace, vol.ID, alloc.ID, usageOpts)
m.Mounts[source] = true

return &MountInfo{
Source: source,
}, nil
}

func (m *MockVolumeManager) UnmountVolume(_ context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error {
func (m *MockVolumeManager) UnmountVolume(_ context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions) error {
if m.CallCounter != nil {
m.CallCounter.Inc("UnmountVolume")
}
Expand All @@ -94,7 +94,7 @@ func (m *MockVolumeManager) UnmountVolume(_ context.Context, volID, remoteID, al
}

// "unmount" it
delete(m.Mounts, m.mountName(volID, allocID, usageOpts))
delete(m.Mounts, m.mountName(volNS, volID, allocID, usageOpts))
return nil
}

Expand All @@ -108,17 +108,17 @@ func (m *MockVolumeManager) HasMount(_ context.Context, mountInfo *MountInfo) (b
return m.Mounts[mountInfo.Source], nil
}

func (m *MockVolumeManager) ExpandVolume(_ context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error) {
func (m *MockVolumeManager) ExpandVolume(_ context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error) {
m.LastExpandVolumeCall = &MockExpandVolumeCall{
volID, remoteID, allocID, usageOpts, capacity,
volNS, volID, remoteID, allocID, usageOpts, capacity,
}
return capacity.RequiredBytes, m.NextExpandVolumeErr
}

type MockExpandVolumeCall struct {
VolID, RemoteID, AllocID string
UsageOpts *UsageOptions
Capacity *csi.CapacityRange
VolNS, VolID, RemoteID, AllocID string
UsageOpts *UsageOptions
Capacity *csi.CapacityRange
}

func (m *MockVolumeManager) ExternalID() string {
Expand Down
42 changes: 31 additions & 11 deletions client/pluginmanager/csimanager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -73,8 +74,8 @@ func newVolumeManager(logger hclog.Logger, eventer TriggerNodeEvent, plugin csi.
}
}

func (v *volumeManager) stagingDirForVolume(root string, volID string, usage *UsageOptions) string {
return filepath.Join(root, StagingDirName, volID, usage.ToFS())
func (v *volumeManager) stagingDirForVolume(root string, volNS, volID string, usage *UsageOptions) string {
return filepath.Join(root, StagingDirName, volNS, volID, usage.ToFS())
}

func (v *volumeManager) allocDirForVolume(root string, volID, allocID string) string {
Expand All @@ -92,7 +93,7 @@ func (v *volumeManager) targetForVolume(root string, volID, allocID string, usag
// Returns whether the directory is a pre-existing mountpoint, the staging path,
// and any errors that occurred.
func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume, usage *UsageOptions) (string, bool, error) {
stagingPath := v.stagingDirForVolume(v.mountRoot, vol.ID, usage)
stagingPath := v.stagingDirForVolume(v.mountRoot, vol.Namespace, vol.ID, usage)

// Make the staging path, owned by the Nomad User
if err := os.MkdirAll(stagingPath, 0700); err != nil && !os.IsExist(err) {
Expand Down Expand Up @@ -167,7 +168,7 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume,
if err != nil {
return err
}
pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol.ID, usage)
pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol.Namespace, vol.ID, usage)

logger.Trace("Volume staging environment", "pre-existing_mount", isMount, "host_staging_path", hostStagingPath, "plugin_staging_path", pluginStagingPath)

Expand Down Expand Up @@ -203,7 +204,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum
logger := hclog.FromContext(ctx)
var pluginStagingPath string
if v.requiresStaging {
pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol.ID, usage)
pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol.Namespace, vol.ID, usage)
}

hostTargetPath, isMount, err := v.ensureAllocDir(vol, alloc, usage)
Expand Down Expand Up @@ -284,10 +285,19 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume,
// once for each staging path that a volume has been staged under.
// It is safe to call multiple times and a plugin is required to return OK if
// the volume has been unstaged or was never staged on the node.
func (v *volumeManager) unstageVolume(ctx context.Context, volID, remoteID string, usage *UsageOptions) error {
func (v *volumeManager) unstageVolume(ctx context.Context, volNS, volID, remoteID string, usage *UsageOptions) error {
logger := hclog.FromContext(ctx)
logger.Trace("Unstaging volume")
stagingPath := v.stagingDirForVolume(v.containerMountPoint, volID, usage)

stagingPath := v.stagingDirForVolume(v.containerMountPoint, volNS, volID, usage)
_, err := os.Stat(stagingPath)
if err != nil && errors.Is(err, fs.ErrNotExist) {
// COMPAT: it's possible to get an unmount request that includes the
// namespace even for volumes that were mounted before the path included
// the namespace, so if the staging path doesn't exist, try the older
// path
stagingPath = v.stagingDirForVolume(v.containerMountPoint, "", volID, usage)
}

// CSI NodeUnstageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
Expand Down Expand Up @@ -349,7 +359,7 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al
return fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr)
}

func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) {
func (v *volumeManager) UnmountVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usage *UsageOptions) (err error) {
logger := v.logger.With("volume_id", volID, "alloc_id", allocID)
ctx = hclog.WithContext(ctx, logger)

Expand All @@ -358,7 +368,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
canRelease := v.usageTracker.Free(allocID, volID, usage)
if v.requiresStaging && canRelease {
err = v.unstageVolume(ctx, volID, remoteID, usage)
err = v.unstageVolume(ctx, volNS, volID, remoteID, usage)
}
}

Expand All @@ -384,20 +394,30 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo
}

// ExpandVolume sends a NodeExpandVolume request to the node plugin
func (v *volumeManager) ExpandVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions, capacity *csi.CapacityRange) (newCapacity int64, err error) {
func (v *volumeManager) ExpandVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usage *UsageOptions, capacity *csi.CapacityRange) (newCapacity int64, err error) {
capability, err := csi.VolumeCapabilityFromStructs(usage.AttachmentMode, usage.AccessMode, usage.MountOptions)
if err != nil {
// nil may be acceptable, so let the node plugin decide.
v.logger.Warn("ExpandVolume: unable to detect volume capability",
"volume_id", volID, "alloc_id", allocID, "error", err)
}

stagingPath := v.stagingDirForVolume(v.containerMountPoint, volNS, volID, usage)
_, err = os.Stat(stagingPath)
if err != nil && errors.Is(err, fs.ErrNotExist) {
// COMPAT: it's possible to get an unmount request that includes the
// namespace even for volumes that were mounted before the path included
// the namespace, so if the staging path doesn't exist, try the older
// path
stagingPath = v.stagingDirForVolume(v.containerMountPoint, "", volID, usage)
}

req := &csi.NodeExpandVolumeRequest{
ExternalVolumeID: remoteID,
CapacityRange: capacity,
Capability: capability,
TargetPath: v.targetForVolume(v.containerMountPoint, volID, allocID, usage),
StagingPath: v.stagingDirForVolume(v.containerMountPoint, volID, usage),
StagingPath: stagingPath,
}
resp, err := v.plugin.NodeExpandVolume(ctx, req,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
Expand Down
6 changes: 4 additions & 2 deletions client/pluginmanager/csimanager/volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) {
eventer := func(e *structs.NodeEvent) {}
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake,
tmpPath, tmpPath, true, "i-example")
expectedStagingPath := manager.stagingDirForVolume(tmpPath, tc.Volume.ID, tc.UsageOptions)
expectedStagingPath := manager.stagingDirForVolume(tmpPath,
tc.Volume.Namespace, tc.Volume.ID, tc.UsageOptions)

if tc.CreateDirAheadOfTime {
err := os.MkdirAll(expectedStagingPath, 0700)
Expand Down Expand Up @@ -258,6 +259,7 @@ func TestVolumeManager_unstageVolume(t *testing.T) {
ctx := context.Background()

err := manager.unstageVolume(ctx,
tc.Volume.Namespace,
tc.Volume.ID, tc.Volume.RemoteID(), tc.UsageOptions)

if tc.ExpectedErr != nil {
Expand Down Expand Up @@ -514,7 +516,7 @@ func TestVolumeManager_MountVolumeEvents(t *testing.T) {
require.Equal(t, "true", e.Details["success"])
events = events[1:]

err = manager.UnmountVolume(ctx, vol.ID, vol.RemoteID(), alloc.ID, usage)
err = manager.UnmountVolume(ctx, vol.Namespace, vol.ID, vol.RemoteID(), alloc.ID, usage)
require.NoError(t, err)

require.Equal(t, 1, len(events))
Expand Down
18 changes: 10 additions & 8 deletions client/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,12 @@ type ClientCSIControllerListSnapshotsResponse struct {
// a Nomad client to tell a CSI node plugin on that client to perform
// NodeUnpublish and NodeUnstage.
type ClientCSINodeDetachVolumeRequest struct {
PluginID string // ID of the plugin that manages the volume (required)
VolumeID string // ID of the volume to be unpublished (required)
AllocID string // ID of the allocation we're unpublishing for (required)
NodeID string // ID of the Nomad client targeted
ExternalID string // External ID of the volume to be unpublished (required)
PluginID string // ID of the plugin that manages the volume (required)
VolumeID string // ID of the volume to be unpublished (required)
VolumeNamespace string // Namespace of the volume to be unpublished (required)
AllocID string // ID of the allocation we're unpublishing for (required)
NodeID string // ID of the Nomad client targeted
ExternalID string // External ID of the volume to be unpublished (required)

// These fields should match the original volume request so that
// we can find the mount points on the client
Expand All @@ -459,9 +460,10 @@ type ClientCSINodeDetachVolumeResponse struct{}
// a Nomad client to tell a CSI node plugin on that client to perform
// NodeExpandVolume.
type ClientCSINodeExpandVolumeRequest struct {
PluginID string // ID of the plugin that manages the volume (required)
VolumeID string // ID of the volume to be expanded (required)
ExternalID string // External ID of the volume to be expanded (required)
PluginID string // ID of the plugin that manages the volume (required)
VolumeID string // ID of the volume to be expanded (required)
VolumeNamespace string // Namespace of the volume to be expanded (required)
ExternalID string // External ID of the volume to be expanded (required)

// Capacity range (required) to be sent to the node plugin
Capacity *csi.CapacityRange
Expand Down
28 changes: 15 additions & 13 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,14 +845,15 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc
}

req := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: vol.PluginID,
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: claim.AttachmentMode,
AccessMode: claim.AccessMode,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
PluginID: vol.PluginID,
VolumeID: vol.ID,
VolumeNamespace: vol.Namespace,
ExternalID: vol.RemoteID(),
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: claim.AttachmentMode,
AccessMode: claim.AccessMode,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
}
err := v.srv.RPC("ClientCSI.NodeDetachVolume",
req, &cstructs.ClientCSINodeDetachVolumeResponse{})
Expand Down Expand Up @@ -1295,11 +1296,12 @@ func (v *CSIVolume) nodeExpandVolume(vol *structs.CSIVolume, plugin *structs.CSI

resp := &cstructs.ClientCSINodeExpandVolumeResponse{}
req := &cstructs.ClientCSINodeExpandVolumeRequest{
PluginID: plugin.ID,
VolumeID: vol.ID,
ExternalID: vol.ExternalID,
Capacity: capacity,
Claim: claim,
PluginID: plugin.ID,
VolumeID: vol.ID,
VolumeNamespace: vol.Namespace,
ExternalID: vol.ExternalID,
Capacity: capacity,
Claim: claim,
}
if err := v.srv.RPC("ClientCSI.NodeExpandVolume", req, resp); err != nil {
mErr.Errors = append(mErr.Errors, err)
Expand Down

0 comments on commit ea3f559

Please sign in to comment.