Skip to content

Commit

Permalink
check path existence and lock in NodeUnpublishVolume (#3284)
Browse files Browse the repository at this point in the history
* check path existence and lock in NodeUnpublishVolume

Signed-off-by: wangshulin <[email protected]>

* use path level lock and lock for NodePublishVolume

Signed-off-by: wangshulin <[email protected]>

* small fix

Signed-off-by: wangshulin <[email protected]>

* return codes.aborted when the lock is not acquired

Signed-off-by: wangshulin <[email protected]>

* check target path validity

Signed-off-by: wangshulin <[email protected]>

* adjust order of check path and acquire lock

Signed-off-by: wangshulin <[email protected]>

---------

Signed-off-by: wangshulin <[email protected]>
  • Loading branch information
wangshli authored and TrafalgarZZZ committed Sep 13, 2023
1 parent 987e6e7 commit 950369e
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 16 deletions.
1 change: 1 addition & 0 deletions pkg/csi/plugins/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (d *driver) newNodeServer() *nodeServer {
client: d.client,
apiReader: d.apiReader,
nodeAuthorizedClient: d.nodeAuthorizedClient,
locks: utils.NewVolumeLocks(),
}
}

Expand Down
76 changes: 60 additions & 16 deletions pkg/csi/plugins/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -58,14 +57,24 @@ type nodeServer struct {
client client.Client
apiReader client.Reader
nodeAuthorizedClient *kubernetes.Clientset
mutex sync.Mutex
locks *utils.VolumeLocks
node *v1.Node
}

func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {

glog.Infof("NodePublishVolumeRequest is %v", req)
targetPath := req.GetTargetPath()
// check targetpath validity
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodePublishVolume operation requires targetPath but is not provided")
}

// The lock is to avoid race condition
if lock := ns.locks.TryAcquire(targetPath); !lock {
return nil, status.Errorf(codes.Aborted, "NodePublishVolume operation on targetPath %s already exists", targetPath)
}
defer ns.locks.Release(targetPath)

isMount, err := utils.IsMounted(targetPath)
if err != nil {
Expand Down Expand Up @@ -166,6 +175,27 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis

func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
// check targetpath validity
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume operation requires targetPath but is not provided")
}

// The lock is to avoid race condition
if lock := ns.locks.TryAcquire(targetPath); !lock {
return nil, status.Errorf(codes.Aborted, "NodeUnpublishVolume operation on targetPath %s already exists", targetPath)
}
defer ns.locks.Release(targetPath)

// check path existence
_, err := os.Stat(targetPath)
// No need to unmount non-existing targetPath
if os.IsNotExist(err) {
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up, so it doesn't need to be unmounted", targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
if err != nil {
return nil, errors.Wrapf(err, "NodeUnpublishVolume: stat targetPath %s error %v", targetPath, err)
}

// targetPath may be mount bind many times when mount point recovered.
// umount until it's not mounted.
Expand All @@ -191,7 +221,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}
}

err := mount.CleanupMountPoint(req.GetTargetPath(), mount.New(""), false)
err = mount.CleanupMountPoint(req.GetTargetPath(), mount.New(""), false)
if err != nil {
glog.V(3).Infoln(err)
} else {
Expand All @@ -202,22 +232,29 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}

func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
volumeId := req.GetVolumeId()
if len(volumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeUnstageVolume operation requires volumeId but is not provided")
}

// The lock is to ensure CSI plugin labels the node in correct order
ns.mutex.Lock()
defer ns.mutex.Unlock()
if lock := ns.locks.TryAcquire(volumeId); !lock {
return nil, status.Errorf(codes.Aborted, "NodeUnstageVolume operation on volumeId %s already exists", volumeId)
}
defer ns.locks.Release(volumeId)

// 1. get runtime namespace and name
// A nil volumeContext is passed because unlike csi.NodeStageVolumeRequest, csi.NodeUnstageVolumeRequest has
// no volume context attribute.
namespace, name, err := ns.getRuntimeNamespacedName(nil, req.GetVolumeId())
namespace, name, err := ns.getRuntimeNamespacedName(nil, volumeId)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
// For cases like the related persistent volume has been deleted, ignore it and return success
glog.Warningf("NodeUnstageVolume: volume %s not found, maybe it's already cleaned up, ignore it", req.GetVolumeId())
glog.Warningf("NodeUnstageVolume: volume %s not found, maybe it's already cleaned up, ignore it", volumeId)
return &csi.NodeUnstageVolumeResponse{}, nil
}
glog.Errorf("NodeUnstageVolume: can't get runtime namespace and name given (volumeContext: nil, volumeId: %s): %v", req.GetVolumeId(), err)
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get namespace and name by volume id %s", req.GetVolumeId())
glog.Errorf("NodeUnstageVolume: can't get runtime namespace and name given (volumeContext: nil, volumeId: %s): %v", volumeId, err)
return nil, errors.Wrapf(err, "NodeUnstageVolume: can't get namespace and name by volume id %s", volumeId)
}

// 2. Check fuse clean policy. If clean policy is set to OnRuntimeDeleted, there is no
Expand Down Expand Up @@ -249,7 +286,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}

// 3. check if the path is mounted
inUse, err := checkMountInUse(req.GetVolumeId())
inUse, err := checkMountInUse(volumeId)
if err != nil {
return nil, errors.Wrap(err, "NodeUnstageVolume: can't check mount in use")
}
Expand Down Expand Up @@ -282,10 +319,17 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}

func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
volumeId := req.GetVolumeId()
if len(volumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume operation requires volumeId but is not provided")
}

// The lock is to ensure CSI plugin labels the node in correct order
ns.mutex.Lock()
defer ns.mutex.Unlock()
glog.Infof("NodeStageVolume: Starting NodeStage with VolumeId: %s, and VolumeContext: %v", req.GetVolumeId(), req.VolumeContext)
if lock := ns.locks.TryAcquire(volumeId); !lock {
return nil, status.Errorf(codes.Aborted, "NodeStageVolume operation on volumeId %s already exists", volumeId)
}
defer ns.locks.Release(volumeId)
glog.Infof("NodeStageVolume: Starting NodeStage with VolumeId: %s, and VolumeContext: %v", volumeId, req.VolumeContext)

// 1. Start SessMgr Pod and wait for ready if FUSE pod requires SessMgr
sessMgrWorkDir := req.GetVolumeContext()[common.VolumeAttrEFCSessMgrWorkDir]
Expand All @@ -303,10 +347,10 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}

// 3. get runtime namespace and name
namespace, name, err := ns.getRuntimeNamespacedName(req.GetVolumeContext(), req.GetVolumeId())
namespace, name, err := ns.getRuntimeNamespacedName(req.GetVolumeContext(), volumeId)
if err != nil {
glog.Errorf("NodeStageVolume: can't get runtime namespace and name given (volumeContext: %v, volumeId: %s): %v", req.GetVolumeContext(), req.GetVolumeId(), err)
return nil, errors.Wrapf(err, "NodeStageVolume: can't get namespace and name by volume id %s", req.GetVolumeId())
return nil, errors.Wrapf(err, "NodeStageVolume: can't get namespace and name by volume id %s", volumeId)
}

// 4. Label node to launch FUSE Pod
Expand All @@ -327,7 +371,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, errors.Wrapf(err, "NodeStageVolume: error when patching labels on node %s", ns.nodeId)
}

glog.Infof("NodeStageVolume: NodeStage succeded with VolumeId: %s, and added NodeLabel: %s", req.GetVolumeId(), fuseLabelKey)
glog.Infof("NodeStageVolume: NodeStage succeded with VolumeId: %s, and added NodeLabel: %s", volumeId, fuseLabelKey)
return &csi.NodeStageVolumeResponse{}, nil
}

Expand Down
37 changes: 37 additions & 0 deletions pkg/utils/volume_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package utils

import (
"sync"

"k8s.io/apimachinery/pkg/util/sets"
)

type VolumeLocks struct {
locks sets.String
mutex sync.Mutex
}

func NewVolumeLocks() *VolumeLocks {
return &VolumeLocks{
locks: sets.NewString(),
}
}

// TryAcquire tries to acquire the lock for operating on resourceID and returns true if successful.
// If another operation is already using resourceID, returns false.
func (lock *VolumeLocks) TryAcquire(volumeID string) bool {
lock.mutex.Lock()
defer lock.mutex.Unlock()
if lock.locks.Has(volumeID) {
return false
}
lock.locks.Insert(volumeID)
return true
}

// Release releases lock in volume level
func (lock *VolumeLocks) Release(volumeID string) {
lock.mutex.Lock()
defer lock.mutex.Unlock()
lock.locks.Delete(volumeID)
}

0 comments on commit 950369e

Please sign in to comment.