diff --git a/charts/aws-ebs-csi-driver/Chart.yaml b/charts/aws-ebs-csi-driver/Chart.yaml index 12deabf6b8..b8ac4d624e 100644 --- a/charts/aws-ebs-csi-driver/Chart.yaml +++ b/charts/aws-ebs-csi-driver/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v1 appVersion: "0.9.0" name: aws-ebs-csi-driver description: A Helm chart for AWS EBS CSI Driver -version: 0.9.7 +version: 0.10.0 kubeVersion: ">=1.17.0-0" home: https://github.com/kubernetes-sigs/aws-ebs-csi-driver sources: diff --git a/charts/aws-ebs-csi-driver/templates/controller.yaml b/charts/aws-ebs-csi-driver/templates/controller.yaml index aa6a500cb5..00ea83e5cc 100644 --- a/charts/aws-ebs-csi-driver/templates/controller.yaml +++ b/charts/aws-ebs-csi-driver/templates/controller.yaml @@ -182,6 +182,12 @@ spec: {{- with .Values.resources }} resources: {{ toYaml . | nindent 12 }} {{- end }} + {{- if .Values.imagePullSecrets }} + imagePullSecrets: + {{- range .Values.imagePullSecrets }} + - name: {{ . }} + {{- end }} + {{- end }} volumes: - name: socket-dir emptyDir: {} diff --git a/charts/aws-ebs-csi-driver/templates/node.yaml b/charts/aws-ebs-csi-driver/templates/node.yaml index 4a08ad6209..949fc5ad19 100644 --- a/charts/aws-ebs-csi-driver/templates/node.yaml +++ b/charts/aws-ebs-csi-driver/templates/node.yaml @@ -134,6 +134,12 @@ spec: resources: {{ toYaml . | nindent 12 }} {{- end }} {{- end }} + {{- if .Values.imagePullSecrets }} + imagePullSecrets: + {{- range .Values.imagePullSecrets }} + - name: {{ . }} + {{- end }} + {{- end }} volumes: - name: kubelet-dir hostPath: diff --git a/charts/aws-ebs-csi-driver/templates/statefulset.yaml b/charts/aws-ebs-csi-driver/templates/statefulset.yaml index 1a28857e14..d73186828b 100644 --- a/charts/aws-ebs-csi-driver/templates/statefulset.yaml +++ b/charts/aws-ebs-csi-driver/templates/statefulset.yaml @@ -43,4 +43,10 @@ spec: args: - --v=5 - --leader-election=false + {{- if .Values.imagePullSecrets }} + imagePullSecrets: + {{- range .Values.imagePullSecrets }} + - name: {{ . }} + {{- end }} + {{- end }} {{- end }} diff --git a/charts/aws-ebs-csi-driver/templates/storageclass.yaml b/charts/aws-ebs-csi-driver/templates/storageclass.yaml index c69af7dc63..3f9db8ad0e 100644 --- a/charts/aws-ebs-csi-driver/templates/storageclass.yaml +++ b/charts/aws-ebs-csi-driver/templates/storageclass.yaml @@ -3,6 +3,12 @@ kind: StorageClass apiVersion: storage.k8s.io/v1 metadata: name: {{ .name }} + {{- if .annotations }} + annotations: {{- .annotations | toYaml | trim | nindent 4 }} + {{- end }} + {{- if .labels }} + labels: {{- .labels | toYaml | trim | nindent 4 }} + {{- end }} provisioner: ebs.csi.aws.com -{{ omit (dict "volumeBindingMode" "WaitForFirstConsumer" | merge .) "name" | toYaml }} +{{ omit (dict "volumeBindingMode" "WaitForFirstConsumer" | merge .) "name" "annotations" "labels" | toYaml }} {{- end }} diff --git a/charts/aws-ebs-csi-driver/values.yaml b/charts/aws-ebs-csi-driver/values.yaml index ca318bab09..3cad7a1c38 100644 --- a/charts/aws-ebs-csi-driver/values.yaml +++ b/charts/aws-ebs-csi-driver/values.yaml @@ -40,7 +40,7 @@ fullnameOverride: "" podAnnotations: {} # True if enable volume scheduling for dynamic volume provisioning -enableVolumeScheduling: false +enableVolumeScheduling: true # True if enable volume resizing enableVolumeResizing: false @@ -114,6 +114,12 @@ serviceAccount: storageClasses: [] # Add StorageClass resources like: # - name: ebs-sc +# # annotation metadata +# annotations: +# storageclass.kubernetes.io/is-default-class: "true" +# # label metadata +# labels: +# my-label-is: supercool # # defaults to WaitForFirstConsumer # volumeBindingMode: WaitForFirstConsumer # # defaults to Delete diff --git a/hack/e2e/run.sh b/hack/e2e/run.sh index 12db9c227a..595d5ef534 100755 --- a/hack/e2e/run.sh +++ b/hack/e2e/run.sh @@ -40,7 +40,7 @@ AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) IMAGE_NAME=${IMAGE_NAME:-${AWS_ACCOUNT_ID}.dkr.ecr.${REGION}.amazonaws.com/${DRIVER_NAME}} IMAGE_TAG=${IMAGE_TAG:-${TEST_ID}} -K8S_VERSION=${K8S_VERSION:-1.18.10} +K8S_VERSION=${K8S_VERSION:-1.18.16} KOPS_VERSION=${KOPS_VERSION:-1.18.2} KOPS_STATE_FILE=${KOPS_STATE_FILE:-s3://k8s-kops-csi-e2e} KOPS_FEATURE_GATES_FILE=${KOPS_FEATURE_GATES_FILE:-./hack/feature-gates.yaml} diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index f0aff469b4..0b82989604 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -27,6 +27,7 @@ import ( csi "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/protobuf/ptypes" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/internal" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -56,6 +57,7 @@ var ( // controllerService represents the controller service of CSI driver type controllerService struct { cloud cloud.Cloud + inFlight *internal.InFlight driverOptions *DriverOptions } @@ -87,6 +89,7 @@ func newControllerService(driverOptions *DriverOptions) controllerService { return controllerService{ cloud: cloud, + inFlight: internal.NewInFlight(), driverOptions: driverOptions, } } @@ -201,6 +204,13 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol return newCreateVolumeResponse(disk), nil } + // check if a request is already in-flight because the CreateVolume API is not idempotent + if ok := d.inFlight.Insert(req.String()); !ok { + msg := fmt.Sprintf("Create volume request for %s is already in progress", volName) + return nil, status.Error(codes.Aborted, msg) + } + defer d.inFlight.Delete(req.String()) + // create a new volume zone := pickAvailabilityZone(req.GetAccessibilityRequirements()) outpostArn := getOutpostArn(req.GetAccessibilityRequirements()) diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index aa16c7cdcd..55878689c5 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -31,6 +31,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/mock/gomock" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/internal" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/mocks" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util" "google.golang.org/grpc/codes" @@ -198,6 +199,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -268,6 +270,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -335,6 +338,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -390,6 +394,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -445,6 +450,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -472,6 +478,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -526,6 +533,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -612,6 +620,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -675,6 +684,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -736,6 +746,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -792,6 +803,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -834,6 +846,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -876,6 +889,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -917,6 +931,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -958,6 +973,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -999,6 +1015,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1041,6 +1058,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1076,6 +1094,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1116,6 +1135,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1156,6 +1176,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1230,6 +1251,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1309,7 +1331,8 @@ func TestCreateVolume(t *testing.T) { mockCloud.EXPECT().CreateDisk(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(diskOptions)).Return(mockDisk, nil) awsDriver := controllerService{ - cloud: mockCloud, + cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{ extraTags: map[string]string{ extraVolumeTagKey: extraVolumeTagValue, @@ -1370,7 +1393,8 @@ func TestCreateVolume(t *testing.T) { mockCloud.EXPECT().CreateDisk(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(diskOptions)).Return(mockDisk, nil) awsDriver := controllerService{ - cloud: mockCloud, + cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{ kubernetesClusterID: clusterID, }, @@ -1437,6 +1461,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1472,6 +1497,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1489,6 +1515,48 @@ func TestCreateVolume(t *testing.T) { } }, }, + { + name: "fail with in-flight request", + testFunc: func(t *testing.T) { + req := &csi.CreateVolumeRequest{ + Name: "random-vol-name", + CapacityRange: stdCapRange, + VolumeCapabilities: stdVolCap, + Parameters: nil, + } + + ctx := context.Background() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := mocks.NewMockCloud(mockCtl) + mockCloud.EXPECT().GetDiskByName(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(stdVolSize)).Return(nil, cloud.ErrNotFound) + + inFlight := internal.NewInFlight() + inFlight.Insert(req.String()) + defer inFlight.Delete(req.String()) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: inFlight, + driverOptions: &DriverOptions{}, + } + + _, err := awsDriver.CreateVolume(ctx, req) + if err == nil { + t.Fatalf("Expected CreateVolume to fail but got no error") + } + + srvErr, ok := status.FromError(err) + if !ok { + t.Fatalf("Could not get error status code from error: %v", srvErr) + } + if srvErr.Code() != codes.Aborted { + t.Fatalf("Expected Aborted but got: %s", srvErr.Code()) + } + }, + }, } for _, tc := range testCases { @@ -1517,6 +1585,7 @@ func TestDeleteVolume(t *testing.T) { mockCloud.EXPECT().DeleteDisk(gomock.Eq(ctx), gomock.Eq(req.VolumeId)).Return(true, nil) awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.DeleteVolume(ctx, req) @@ -1548,6 +1617,7 @@ func TestDeleteVolume(t *testing.T) { mockCloud.EXPECT().DeleteDisk(gomock.Eq(ctx), gomock.Eq(req.VolumeId)).Return(false, cloud.ErrNotFound) awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.DeleteVolume(ctx, req) @@ -1578,6 +1648,7 @@ func TestDeleteVolume(t *testing.T) { mockCloud.EXPECT().DeleteDisk(gomock.Eq(ctx), gomock.Eq(req.VolumeId)).Return(false, fmt.Errorf("DeleteDisk could not delete volume")) awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.DeleteVolume(ctx, req) @@ -1837,6 +1908,7 @@ func TestCreateSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.CreateSnapshot(context.Background(), req) @@ -1891,7 +1963,8 @@ func TestCreateSnapshot(t *testing.T) { mockCloud.EXPECT().GetSnapshotByName(gomock.Eq(ctx), gomock.Eq(req.GetName())).Return(nil, cloud.ErrNotFound) awsDriver := controllerService{ - cloud: mockCloud, + cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{ kubernetesClusterID: clusterID, }, @@ -1944,7 +2017,8 @@ func TestCreateSnapshot(t *testing.T) { mockCloud.EXPECT().GetSnapshotByName(gomock.Eq(ctx), gomock.Eq(req.GetName())).Return(nil, cloud.ErrNotFound) awsDriver := controllerService{ - cloud: mockCloud, + cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{ extraTags: map[string]string{ extraVolumeTagKey: extraVolumeTagValue, @@ -1976,6 +2050,7 @@ func TestCreateSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } if _, err := awsDriver.CreateSnapshot(context.Background(), req); err != nil { @@ -2024,6 +2099,7 @@ func TestCreateSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.CreateSnapshot(context.Background(), req) @@ -2090,6 +2166,7 @@ func TestCreateSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.CreateSnapshot(context.Background(), req) @@ -2131,6 +2208,7 @@ func TestDeleteSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2155,6 +2233,7 @@ func TestDeleteSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2211,6 +2290,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2237,6 +2317,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2272,6 +2353,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2301,6 +2383,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2330,6 +2413,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2361,6 +2445,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2423,6 +2508,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2453,7 +2539,11 @@ func TestControllerPublishVolume(t *testing.T) { mockCloud := mocks.NewMockCloud(mockCtl) mockCloud.EXPECT().DetachDisk(gomock.Eq(ctx), req.VolumeId, req.NodeId).Return(cloud.ErrNotFound) - awsDriver := controllerService{cloud: mockCloud} + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + } resp, err := awsDriver.ControllerUnpublishVolume(ctx, req) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -2478,6 +2568,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2510,6 +2601,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2543,6 +2635,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2581,6 +2674,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2616,6 +2710,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2652,6 +2747,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2693,6 +2789,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2743,6 +2840,7 @@ func TestControllerUnpublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2770,6 +2868,7 @@ func TestControllerUnpublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2802,6 +2901,7 @@ func TestControllerUnpublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2881,6 +2981,7 @@ func TestControllerExpandVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } diff --git a/pkg/driver/internal/inflight.go b/pkg/driver/internal/inflight.go index f987918e6e..5f0d2a9ad7 100644 --- a/pkg/driver/internal/inflight.go +++ b/pkg/driver/internal/inflight.go @@ -17,6 +17,7 @@ limitations under the License. package internal import ( + "k8s.io/klog" "sync" ) @@ -29,7 +30,7 @@ type Idempotent interface { String() string } -// InFlight is a struct used to manage in flight requests. +// InFlight is a struct used to manage in flight requests per volumeId. type InFlight struct { mux *sync.Mutex inFlight map[string]bool @@ -43,28 +44,27 @@ func NewInFlight() *InFlight { } } -// Insert inserts the entry to the current list of inflight requests. +// Insert inserts the entry to the current list of inflight request key is volumeId for node and req hash for controller . // Returns false when the key already exists. -func (db *InFlight) Insert(entry Idempotent) bool { +func (db *InFlight) Insert(key string) bool { db.mux.Lock() defer db.mux.Unlock() - hash := entry.String() - - _, ok := db.inFlight[hash] + _, ok := db.inFlight[key] if ok { return false } - db.inFlight[hash] = true + db.inFlight[key] = true return true } // Delete removes the entry from the inFlight entries map. // It doesn't return anything, and will do nothing if the specified key doesn't exist. -func (db *InFlight) Delete(h Idempotent) { +func (db *InFlight) Delete(key string) { db.mux.Lock() defer db.mux.Unlock() - delete(db.inFlight, h.String()) + delete(db.inFlight, key) + klog.V(4).Infof("Node Service: volume=%q operation finished", key) } diff --git a/pkg/driver/internal/inflight_test.go b/pkg/driver/internal/inflight_test.go index b0d5c824d7..faaeb74c13 100644 --- a/pkg/driver/internal/inflight_test.go +++ b/pkg/driver/internal/inflight_test.go @@ -18,37 +18,14 @@ package internal import ( "testing" - - "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util" ) type testRequest struct { - request *csi.CreateVolumeRequest - expResp bool - delete bool + volumeId string + expResp bool + delete bool } -var stdVolCap = []*csi.VolumeCapability{ - { - AccessType: &csi.VolumeCapability_Mount{ - Mount: &csi.VolumeCapability_MountVolume{}, - }, - AccessMode: &csi.VolumeCapability_AccessMode{ - Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, - }, - }, -} - -var ( - stdVolSize = int64(5 * util.GiB) - stdCapRange = &csi.CapacityRange{RequiredBytes: stdVolSize} - stdParams = map[string]string{ - "key1": "value1", - "key2": "value2", - } -) - func TestInFlight(t *testing.T) { testCases := []struct { name string @@ -58,137 +35,54 @@ func TestInFlight(t *testing.T) { name: "success normal", requests: []testRequest{ { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, - }, - }, - }, - { - name: "success adding request with different name", - requests: []testRequest{ - { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, - }, - { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, - }, - }, - }, - { - name: "success adding request with different parameters", - requests: []testRequest{ - { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: map[string]string{"foo": "bar"}, - }, - expResp: true, - }, - { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - }, - expResp: true, + + volumeId: "random-vol-name", + expResp: true, }, }, }, { - name: "success adding request with different parameters", + name: "success adding request with different volumeId", requests: []testRequest{ { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: map[string]string{"foo": "bar"}, - }, - expResp: true, + volumeId: "random-vol-foobar", + expResp: true, }, { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name-foobar", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: map[string]string{"foo": "baz"}, - }, - expResp: true, + volumeId: "random-vol-name-foobar", + expResp: true, }, }, }, { - name: "failure adding copy of request", + name: "failed adding request with same volumeId", requests: []testRequest{ { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, + volumeId: "random-vol-name-foobar", + expResp: true, }, { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: false, + volumeId: "random-vol-name-foobar", + expResp: false, }, }, }, + { name: "success add, delete, add copy", requests: []testRequest{ { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, + volumeId: "random-vol-name", + expResp: true, }, { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: false, - delete: true, + volumeId: "random-vol-name", + expResp: false, + delete: true, }, { - request: &csi.CreateVolumeRequest{ - Name: "random-vol-name", - CapacityRange: stdCapRange, - VolumeCapabilities: stdVolCap, - Parameters: stdParams, - }, - expResp: true, + volumeId: "random-vol-name", + expResp: true, }, }, }, @@ -200,9 +94,9 @@ func TestInFlight(t *testing.T) { for _, r := range tc.requests { var resp bool if r.delete { - db.Delete(r.request) + db.Delete(r.volumeId) } else { - resp = db.Insert(r.request) + resp = db.Insert(r.volumeId) } if r.expResp != resp { t.Fatalf("expected insert to be %+v, got %+v", r.expResp, resp) diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 96f68f3cec..2c59f09978 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -57,6 +57,9 @@ const ( // defaultMaxEBSNitroVolumes is the limit of volumes for some smaller instances, like c5 and m5. defaultMaxEBSNitroVolumes = 25 + + // VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID + VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress" ) var ( @@ -141,14 +144,12 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } } - if ok := d.inFlight.Insert(req); !ok { - msg := fmt.Sprintf("request to stage volume=%q is already in progress", volumeID) - return nil, status.Error(codes.Internal, msg) + if ok := d.inFlight.Insert(volumeID); !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) } defer func() { - klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", req.GetVolumeId()) - d.inFlight.Delete(req) - klog.V(4).Info("donedone") + klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", volumeID) + d.inFlight.Delete(volumeID) }() devicePath, ok := req.PublishContext[DevicePathKey] @@ -218,6 +219,14 @@ func (d *nodeService) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Error(codes.InvalidArgument, "Staging target not provided") } + if ok := d.inFlight.Insert(volumeID); !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + } + defer func() { + klog.V(4).Infof("NodeUnStageVolume: volume=%q operation finished", volumeID) + d.inFlight.Delete(volumeID) + }() + // Check if target directory is a mount point. GetDeviceNameFromMount // given a mnt point, finds the device from /proc/mounts // returns the device name, reference count, and error code @@ -344,6 +353,14 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") } + if ok := d.inFlight.Insert(volumeID); !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + } + defer func() { + klog.V(4).Infof("NodePublishVolume: volume=%q operation finished", volumeID) + d.inFlight.Delete(volumeID) + }() + mountOptions := []string{"bind"} if req.GetReadonly() { mountOptions = append(mountOptions, "ro") @@ -374,6 +391,13 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu if len(target) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path not provided") } + if ok := d.inFlight.Insert(volumeID); !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + } + defer func() { + klog.V(4).Infof("NodeUnPublishVolume: volume=%q operation finished", volumeID) + d.inFlight.Delete(volumeID) + }() klog.V(5).Infof("NodeUnpublishVolume: unmounting %s", target) err := d.mounter.Unmount(target) diff --git a/pkg/driver/node_test.go b/pkg/driver/node_test.go index 3a2572db19..d864f34480 100644 --- a/pkg/driver/node_test.go +++ b/pkg/driver/node_test.go @@ -477,6 +477,39 @@ func TestNodeStageVolume(t *testing.T) { } }, }, + { + name: "fail with in-flight request", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockMetadata := mocks.NewMockMetadataService(mockCtl) + mockMounter := mocks.NewMockMounter(mockCtl) + + req := &csi.NodeStageVolumeRequest{ + PublishContext: map[string]string{DevicePathKey: devicePath}, + StagingTargetPath: targetPath, + VolumeCapability: stdVolCap, + VolumeId: "vol-test", + } + + inFlight := internal.NewInFlight() + inFlight.Insert(req.VolumeId) + defer inFlight.Delete(req.VolumeId) + + awsDriver := &nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + inFlight: inFlight, + } + + _, err := awsDriver.NodeStageVolume(context.TODO(), req) + if err == nil { + t.Fatalf("Expect error but got no error") + } + expectErr(t, err, codes.Aborted) + }, + }, } for _, tc := range testCases { @@ -648,6 +681,31 @@ func TestNodeUnstageVolume(t *testing.T) { expectErr(t, err, codes.Internal) }, }, + { + name: "fail another operation in-flight on given volumeId", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockMetadata := mocks.NewMockMetadataService(mockCtl) + mockMounter := mocks.NewMockMounter(mockCtl) + + awsDriver := &nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + inFlight: internal.NewInFlight(), + } + + req := &csi.NodeUnstageVolumeRequest{ + StagingTargetPath: targetPath, + VolumeId: "vol-test", + } + + awsDriver.inFlight.Insert("vol-test") + _, err := awsDriver.NodeUnstageVolume(context.TODO(), req) + expectErr(t, err, codes.Aborted) + }, + }, } for _, tc := range testCases { @@ -1076,6 +1134,42 @@ func TestNodePublishVolume(t *testing.T) { }, }, + { + name: "fail another operation in-flight on given volumeId", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockMetadata := mocks.NewMockMetadataService(mockCtl) + mockMounter := mocks.NewMockMounter(mockCtl) + + awsDriver := &nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + inFlight: internal.NewInFlight(), + } + + req := &csi.NodePublishVolumeRequest{ + PublishContext: map[string]string{DevicePathKey: "/dev/fake"}, + StagingTargetPath: "/test/staging/path", + TargetPath: "/test/target/path", + VolumeId: "vol-test", + VolumeCapability: &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Block{ + Block: &csi.VolumeCapability_BlockVolume{}, + }, + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + }, + } + awsDriver.inFlight.Insert("vol-test") + + _, err := awsDriver.NodePublishVolume(context.TODO(), req) + expectErr(t, err, codes.Aborted) + + }, + }, } for _, tc := range testCases { @@ -1274,6 +1368,31 @@ func TestNodeUnpublishVolume(t *testing.T) { expectErr(t, err, codes.Internal) }, }, + { + name: "fail another operation in-flight on given volumeId", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockMetadata := mocks.NewMockMetadataService(mockCtl) + mockMounter := mocks.NewMockMounter(mockCtl) + + awsDriver := &nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + inFlight: internal.NewInFlight(), + } + + req := &csi.NodeUnpublishVolumeRequest{ + TargetPath: targetPath, + VolumeId: "vol-test", + } + + awsDriver.inFlight.Insert("vol-test") + _, err := awsDriver.NodeUnpublishVolume(context.TODO(), req) + expectErr(t, err, codes.Aborted) + }, + }, } for _, tc := range testCases { diff --git a/pkg/driver/sanity_test.go b/pkg/driver/sanity_test.go index e2bf9504ad..203608d093 100644 --- a/pkg/driver/sanity_test.go +++ b/pkg/driver/sanity_test.go @@ -47,6 +47,7 @@ func TestSanity(t *testing.T) { options: driverOptions, controllerService: controllerService{ cloud: newFakeCloudProvider(), + inFlight: internal.NewInFlight(), driverOptions: driverOptions, }, nodeService: nodeService{