diff --git a/pkg/util/provider/drain/drain.go b/pkg/util/provider/drain/drain.go index 17d2fe978..660a83a9c 100644 --- a/pkg/util/provider/drain/drain.go +++ b/pkg/util/provider/drain/drain.go @@ -33,8 +33,6 @@ import ( "time" "github.com/Masterminds/semver/v3" - "github.com/gardener/machine-controller-manager/pkg/util/k8sutils" - "github.com/gardener/machine-controller-manager/pkg/util/provider/driver" corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" @@ -49,6 +47,9 @@ import ( policyv1listers "k8s.io/client-go/listers/policy/v1" policyv1beta1listers "k8s.io/client-go/listers/policy/v1beta1" "k8s.io/klog/v2" + + "github.com/gardener/machine-controller-manager/pkg/util/k8sutils" + "github.com/gardener/machine-controller-manager/pkg/util/provider/driver" ) // Options are configurable options while draining a node before deletion @@ -90,11 +91,36 @@ type fatal struct { string } -// PodVolumeInfo is the struct used to hold the PersistentVolumeID and volumeID -// for all the PVs attached to the pod +// PodVolumeInfo holds a list of infos about PersistentVolumes referenced by a single pod. type PodVolumeInfo struct { - persistentVolumeList []string - volumeList []string + // volumes is the list of infos about all PersistentVolumes referenced by a pod via PersistentVolumeClaims. + volumes []VolumeInfo +} + +// PersistentVolumeNames returns the names of all PersistentVolumes used by the pod. +func (p PodVolumeInfo) PersistentVolumeNames() []string { + out := make([]string, 0, len(p.volumes)) + for _, volume := range p.volumes { + out = append(out, volume.persistentVolumeName) + } + return out +} + +// VolumeIDs returns the volume IDs/handles of all PersistentVolumes used by the pod. +func (p PodVolumeInfo) VolumeIDs() []string { + out := make([]string, 0, len(p.volumes)) + for _, volume := range p.volumes { + out = append(out, volume.volumeID) + } + return out +} + +// VolumeInfo holds relevant information about a PersistentVolume for tracking attachments. +type VolumeInfo struct { + // The name of the PersistentVolume referenced by PersistentVolumeClaim used in a Pod. + persistentVolumeName string + // The volume ID/handle corresponding to the PersistentVolume name as return by Driver.GetVolumeIDs. + volumeID string } const ( @@ -505,40 +531,51 @@ func sortPodsByPriority(pods []*corev1.Pod) { }) } -// doAccountingOfPvs returns a map with the key as a hash of -// pod-namespace/pod-name and value as a PodVolumeInfo object -func (o *Options) doAccountingOfPvs(ctx context.Context, pods []*corev1.Pod) map[string]PodVolumeInfo { +// getPodVolumeInfos returns information about all PersistentVolumes of which the machine controller needs to track +// attachments when draining a node. +// It filters out shared PVs (used by multiple pods). +// Also, when mapping PersistentVolume names to volume IDs, the driver filters out volumes of types that don't belong +// to the respective cloud provider. E.g., this filters out CSI volumes of unrelated drivers and NFS volumes, etc. +func (o *Options) getPodVolumeInfos(ctx context.Context, pods []*corev1.Pod) map[string]PodVolumeInfo { var ( - pvMap = make(map[string][]string) - podVolumeInfoMap = make(map[string]PodVolumeInfo) + persistentVolumeNamesByPod = make(map[string][]string) + podVolumeInfos = make(map[string]PodVolumeInfo) ) for _, pod := range pods { - podPVs, _ := o.getPVList(pod) - pvMap[getPodKey(pod)] = podPVs + persistentVolumeNamesByPod[getPodKey(pod)] = o.getPersistentVolumeNamesForPod(pod) } // Filter the list of shared PVs - filterSharedPVs(pvMap) + filterSharedPVs(persistentVolumeNamesByPod) - for podKey, persistentVolumeList := range pvMap { - persistentVolumeListDeepCopy := persistentVolumeList - volumeList, err := o.getVolIDsFromDriver(ctx, persistentVolumeList) - if err != nil { - // In case of error, log and skip this set of volumes - klog.Errorf("error getting volume ID from cloud provider. Skipping volumes for pod: %v. Err: %v", podKey, err) - continue - } + for podKey, persistentVolumeNames := range persistentVolumeNamesByPod { + podVolumeInfo := PodVolumeInfo{} - podVolumeInfo := PodVolumeInfo{ - persistentVolumeList: persistentVolumeListDeepCopy, - volumeList: volumeList, + for _, persistentVolumeName := range persistentVolumeNames { + volumeID, err := o.getVolumeIDFromDriver(ctx, persistentVolumeName) + if err != nil { + // In case of error, log and skip this set of volumes + klog.Errorf("error getting volume ID from cloud provider. Skipping volume %s for pod: %v. Err: %v", persistentVolumeName, podKey, err) + continue + } + + // Only if the driver returns a volume ID for this PV, we want to track its attachment during drain operations. + if volumeID != "" { + podVolumeInfo.volumes = append(podVolumeInfo.volumes, VolumeInfo{ + persistentVolumeName: persistentVolumeName, + volumeID: volumeID, + }) + } else { + klog.V(4).Infof("Driver did not return a volume ID for volume %s. Skipping provider-unrelated volume for pod %s", persistentVolumeName, podKey) + } } - podVolumeInfoMap[podKey] = podVolumeInfo + + podVolumeInfos[podKey] = podVolumeInfo } - klog.V(4).Infof("PodVolumeInfoMap = %v", podVolumeInfoMap) + klog.V(4).Infof("PodVolumeInfos: %v", podVolumeInfos) - return podVolumeInfoMap + return podVolumeInfos } // filterSharedPVs filters out the PVs that are shared among pods. @@ -585,7 +622,7 @@ func (o *Options) evictPodsWithPv(ctx context.Context, attemptEvict bool, pods [ ) { sortPodsByPriority(pods) - podVolumeInfoMap := o.doAccountingOfPvs(ctx, pods) + podVolumeInfoMap := o.getPodVolumeInfos(ctx, pods) var ( remainingPods []*corev1.Pod @@ -597,7 +634,7 @@ func (o *Options) evictPodsWithPv(ctx context.Context, attemptEvict bool, pods [ for i := 0; i < nretries; i++ { remainingPods, fastTrack = o.evictPodsWithPVInternal(ctx, attemptEvict, pods, podVolumeInfoMap, policyGroupVersion, getPodFn, returnCh) if fastTrack || len(remainingPods) == 0 { - //Either all pods got evicted or we need to fast track the return (node deletion detected) + // Either all pods got evicted or we need to fast track the return (node deletion detected) break } @@ -768,7 +805,7 @@ func (o *Options) evictPodsWithPVInternal( o.checkAndDeleteWorker(volumeAttachmentEventCh) continue } - klog.Warningf("Timeout occurred for following volumes to reattach: %v", podVolumeInfo.persistentVolumeList) + klog.Warningf("Timeout occurred for following volumes to reattach: %v", podVolumeInfo.volumes) } o.checkAndDeleteWorker(volumeAttachmentEventCh) @@ -786,8 +823,9 @@ func (o *Options) evictPodsWithPVInternal( return retryPods, false } -func (o *Options) getPVList(pod *corev1.Pod) ([]string, error) { - pvs := []string{} +func (o *Options) getPersistentVolumeNamesForPod(pod *corev1.Pod) []string { + var pvs []string + for i := range pod.Spec.Volumes { vol := &pod.Spec.Volumes[i] @@ -819,17 +857,18 @@ func (o *Options) getPVList(pod *corev1.Pod) ([]string, error) { } } } - return pvs, nil + + return pvs } func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo, nodeName string) error { - if len(podVolumeInfo.volumeList) == 0 || nodeName == "" { + if len(podVolumeInfo.volumes) == 0 || nodeName == "" { // If volume or node name is not available, nothing to do. Just log this as warning - klog.Warningf("Node name: %q, list of pod PVs to wait for detach: %v", nodeName, podVolumeInfo.volumeList) + klog.Warningf("Node name: %q, list of pod PVs to wait for detach: %v", nodeName, podVolumeInfo.volumes) return nil } - klog.V(4).Info("Waiting for following volumes to detach: ", podVolumeInfo.volumeList) + klog.V(3).Infof("Waiting for following volumes to detach: %v", podVolumeInfo.volumes) found := true @@ -852,7 +891,7 @@ func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo return err } - klog.V(4).Infof("No of attached volumes for node %q is %s", nodeName, node.Status.VolumesAttached) + klog.V(4).Infof("Volumes attached to node %q: %s", nodeName, node.Status.VolumesAttached) attachedVols := node.Status.VolumesAttached if len(attachedVols) == 0 { klog.V(4).Infof("No volumes attached to the node %q", nodeName) @@ -860,7 +899,7 @@ func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo } LookUpVolume: - for _, volumeID := range podVolumeInfo.volumeList { + for _, volumeID := range podVolumeInfo.VolumeIDs() { for j := range attachedVols { attachedVol := &attachedVols[j] @@ -869,7 +908,7 @@ func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo if found { klog.V(4).Infof( - "Found volume:%s still attached to node %q. Will re-check in %s", + "Found volume %q still attached to node %q. Will re-check in %s", volumeID, nodeName, VolumeDetachPollInterval, @@ -881,7 +920,7 @@ func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo } } - klog.V(4).Infof("Detached volumes:%s from node %q", podVolumeInfo.volumeList, nodeName) + klog.V(3).Infof("Detached volumes %v from node %q", podVolumeInfo.volumes, nodeName) return nil } @@ -899,18 +938,18 @@ func isDesiredReattachment(volumeAttachment *storagev1.VolumeAttachment, previou // 1. If CSI is enabled use determine reattach // 2. If all else fails, fallback to static timeout func (o *Options) waitForReattach(ctx context.Context, podVolumeInfo PodVolumeInfo, previousNodeName string, volumeAttachmentEventCh chan *storagev1.VolumeAttachment) error { - if len(podVolumeInfo.persistentVolumeList) == 0 || previousNodeName == "" { + if len(podVolumeInfo.volumes) == 0 || previousNodeName == "" { // If volume or node name is not available, nothing to do. Just log this as warning - klog.Warningf("List of pod PVs waiting for reattachment is 0: %v", podVolumeInfo.persistentVolumeList) + klog.Warningf("List of pod PVs waiting for reattachment is 0: %v", podVolumeInfo.volumes) return nil } - klog.V(3).Infof("Waiting for following volumes to reattach: %v", podVolumeInfo.persistentVolumeList) + klog.V(3).Infof("Waiting for following volumes to reattach: %v", podVolumeInfo.volumes) var pvsWaitingForReattachments map[string]bool if volumeAttachmentEventCh != nil { pvsWaitingForReattachments = make(map[string]bool) - for _, persistentVolumeName := range podVolumeInfo.persistentVolumeList { + for _, persistentVolumeName := range podVolumeInfo.PersistentVolumeNames() { pvsWaitingForReattachments[persistentVolumeName] = true } } @@ -923,7 +962,7 @@ func (o *Options) waitForReattach(ctx context.Context, podVolumeInfo PodVolumeIn case <-ctx.Done(): // Timeout occurred waiting for reattachment, exit function with error - klog.Warningf("Timeout occurred while waiting for PVs %v to reattach to a different node", podVolumeInfo.persistentVolumeList) + klog.Warningf("Timeout occurred while waiting for PVs %v to reattach to a different node", podVolumeInfo.volumes) return fmt.Errorf("%s", reattachTimeoutErr) case incomingEvent := <-volumeAttachmentEventCh: @@ -945,39 +984,45 @@ func (o *Options) waitForReattach(ctx context.Context, podVolumeInfo PodVolumeIn } } - klog.V(3).Infof("Successfully reattached volumes: %s", podVolumeInfo.persistentVolumeList) + klog.V(3).Infof("Successfully reattached volumes: %s", podVolumeInfo.volumes) return nil } -func (o *Options) getVolIDsFromDriver(ctx context.Context, pvNames []string) ([]string, error) { - pvSpecs := []*corev1.PersistentVolumeSpec{} +func (o *Options) getVolumeIDFromDriver(ctx context.Context, pvName string) (string, error) { + var pvSpec *corev1.PersistentVolumeSpec - for _, pvName := range pvNames { - try := 0 + try := 0 - for { - pv, err := o.pvLister.Get(pvName) + for { + pv, err := o.pvLister.Get(pvName) - if apierrors.IsNotFound(err) { - break - } else if err != nil { - try++ - if try == GetPvDetailsMaxRetries { - break - } - // In case of error, try again after few seconds - time.Sleep(GetPvDetailsRetryInterval) - continue + if apierrors.IsNotFound(err) { + return "", nil + } else if err != nil { + try++ + if try == GetPvDetailsMaxRetries { + return "", err } - - // Found PV; append and exit - pvSpecs = append(pvSpecs, &pv.Spec) - break + // In case of error, try again after few seconds + time.Sleep(GetPvDetailsRetryInterval) + continue } + + // found PV + pvSpec = &pv.Spec + break + } + + response, err := o.Driver.GetVolumeIDs(ctx, &driver.GetVolumeIDsRequest{PVSpecs: []*corev1.PersistentVolumeSpec{pvSpec}}) + if err != nil { + return "", err } - response, err := o.Driver.GetVolumeIDs(ctx, &driver.GetVolumeIDsRequest{PVSpecs: pvSpecs}) - return response.VolumeIDs, err + if len(response.VolumeIDs) > 0 { + return response.VolumeIDs[0], nil + } + + return "", nil } func (o *Options) evictPodWithoutPVInternal(ctx context.Context, attemptEvict bool, pod *corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error), returnCh chan error) { @@ -1076,8 +1121,8 @@ func (o *Options) waitForDelete(pods []*corev1.Pod, interval, timeout time.Durat for i, pod := range pods { p, err := getPodFn(pod.Namespace, pod.Name) if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { - //cmdutil.PrintSuccess(o.mapper, false, o.Out, "pod", pod.Name, false, verbStr) - //klog.Info("pod deleted successfully found") + // cmdutil.PrintSuccess(o.mapper, false, o.Out, "pod", pod.Name, false, verbStr) + // klog.Info("pod deleted successfully found") continue } else if err != nil { return false, err diff --git a/pkg/util/provider/drain/drain_test.go b/pkg/util/provider/drain/drain_test.go index 4de8f185e..0bb450235 100644 --- a/pkg/util/provider/drain/drain_test.go +++ b/pkg/util/provider/drain/drain_test.go @@ -13,19 +13,23 @@ import ( "sync" "time" - "github.com/gardener/machine-controller-manager/pkg/fakeclient" - "github.com/gardener/machine-controller-manager/pkg/util/provider/driver" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/onsi/gomega/gcustom" + gomegatypes "github.com/onsi/gomega/types" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + coreinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + + "github.com/gardener/machine-controller-manager/pkg/fakeclient" + "github.com/gardener/machine-controller-manager/pkg/util/provider/driver" ) var _ = Describe("drain", func() { @@ -854,6 +858,134 @@ var _ = Describe("drain", func() { minDrainDuration: terminationGracePeriodMedium, }), ) + + Describe("getPodVolumeInfos", func() { + var ( + ctx context.Context + + drain *Options + + pvInformer, pvcInformer cache.SharedIndexInformer + + pods []*corev1.Pod + ) + + BeforeEach(func() { + ctx = context.Background() + + kubeInformerFactory := coreinformers.NewSharedInformerFactory(nil, 0) + + pvInformer = kubeInformerFactory.Core().V1().PersistentVolumes().Informer() + _ = pvInformer + pvcInformer = kubeInformerFactory.Core().V1().PersistentVolumeClaims().Informer() + _ = pvcInformer + + drain = &Options{ + ErrOut: GinkgoWriter, + Out: GinkgoWriter, + + Driver: &drainDriver{}, + pvLister: kubeInformerFactory.Core().V1().PersistentVolumes().Lister(), + pvcLister: kubeInformerFactory.Core().V1().PersistentVolumeClaims().Lister(), + } + + pods = []*corev1.Pod{} + }) + + It("should return empty map for empty pod list", func() { + Expect(drain.getPodVolumeInfos(ctx, pods)).To(BeEmpty()) + }) + + It("should return empty volume list for pods without volumes", func() { + pods = append(pods, getPodsWithoutPV(2, "foo", "bar", "", terminationGracePeriodDefault, nil)...) + + podVolumeInfos := drain.getPodVolumeInfos(ctx, pods) + Expect(podVolumeInfos).To(HaveLen(2)) + Expect(podVolumeInfos).To(And( + HaveKeyWithValue("foo/bar0", matchPodPersistentVolumeNames(BeEmpty())), + HaveKeyWithValue("foo/bar1", matchPodPersistentVolumeNames(BeEmpty())), + )) + }) + + It("should return list of exclusive volumes", func() { + pods = append(pods, getPodWithPV("foo", "bar", "exclusive", "", "", terminationGracePeriodDefault, nil, 1)) + + pvcs := getPVCs(pods) + addAll(pvcInformer, pvcs...) + addAll(pvInformer, appendSuffixToVolumeHandles(getPVs(pvcs), "-id")...) + + podVolumeInfos := drain.getPodVolumeInfos(ctx, pods) + Expect(podVolumeInfos).To(HaveLen(1)) + Expect(podVolumeInfos).To(HaveKeyWithValue( + "foo/bar", And( + matchPodPersistentVolumeNames(ConsistOf("exclusive-0")), + matchPodVolumeIDs(ConsistOf("exclusive-0-id")), + ), + )) + }) + + It("should filter out shared volumes", func() { + pods = append(pods, getPodsWithPV(2, 2, 1, 1, "foo", "bar", "exclusive", "shared", "", terminationGracePeriodDefault, nil)...) + + pvcs := getPVCs(pods) + addAll(pvcInformer, pvcs...) + addAll(pvInformer, getPVs(pvcs)...) + + podVolumeInfos := drain.getPodVolumeInfos(ctx, pods) + Expect(podVolumeInfos).To(HaveLen(2)) + Expect(podVolumeInfos).To(And( + HaveKeyWithValue( + "foo/bar0", matchPodPersistentVolumeNames(ConsistOf("exclusive0-0")), + ), + HaveKeyWithValue( + "foo/bar1", matchPodPersistentVolumeNames(ConsistOf("exclusive1-0")), + ), + )) + }) + + It("should filter out provider-unrelated volumes", func() { + pods = append(pods, getPodWithPV("foo", "bar", "exclusive", "", "", terminationGracePeriodDefault, nil, 1)) + + pvcs := getPVCs(pods) + addAll(pvcInformer, pvcs...) + + pvs := getPVs(pvcs) + pvs[0].Spec.CSI = nil + pvs[0].Spec.NFS = &corev1.NFSVolumeSource{ + Server: "my-nfs-server.example.com", + Path: "/my-share", + } + addAll(pvInformer, pvs...) + + podVolumeInfos := drain.getPodVolumeInfos(ctx, pods) + Expect(podVolumeInfos).To(HaveLen(1)) + Expect(podVolumeInfos).To(HaveKeyWithValue( + "foo/bar", matchPodPersistentVolumeNames(BeEmpty()), + )) + }) + + It("should filter out non-existing PVCs", func() { + pods = append(pods, getPodWithPV("foo", "bar", "exclusive", "", "", terminationGracePeriodDefault, nil, 1)) + + podVolumeInfos := drain.getPodVolumeInfos(ctx, pods) + Expect(podVolumeInfos).To(HaveLen(1)) + Expect(podVolumeInfos).To(HaveKeyWithValue( + "foo/bar", matchPodPersistentVolumeNames(BeEmpty()), + )) + }) + + It("should filter out non-existing persistent volumes", func() { + pods = append(pods, getPodWithPV("foo", "bar", "exclusive", "", "", terminationGracePeriodDefault, nil, 1)) + + addAll(pvcInformer, getPVCs(pods)...) + + podVolumeInfos := drain.getPodVolumeInfos(ctx, pods) + Expect(podVolumeInfos).To(HaveLen(1)) + Expect(podVolumeInfos).To(HaveKeyWithValue( + "foo/bar", matchPodPersistentVolumeNames(BeEmpty()), + )) + }) + }) }) func getPodWithoutPV(ns, name, nodeName string, terminationGracePeriod time.Duration, labels map[string]string) *corev1.Pod { @@ -1053,10 +1185,15 @@ type drainDriver struct { } func (d *drainDriver) GetVolumeIDs(_ context.Context, req *driver.GetVolumeIDsRequest) (*driver.GetVolumeIDsResponse, error) { - volNames := make([]string, len(req.PVSpecs)) - for i := range req.PVSpecs { - volNames[i] = getDrainTestVolumeName(req.PVSpecs[i]) + volNames := make([]string, 0, len(req.PVSpecs)) + + for _, spec := range req.PVSpecs { + // real drivers filter volumes in GetVolumeIDs and only return IDs of provider-related volumes + if volumeName := getDrainTestVolumeName(spec); volumeName != "" { + volNames = append(volNames, volumeName) + } } + return &driver.GetVolumeIDsResponse{ VolumeIDs: volNames, }, nil @@ -1146,3 +1283,32 @@ func updateVolumeAttachments(drainOptions *Options, pvName string, nodeName stri drainOptions.volumeAttachmentHandler.AddVolumeAttachment(newVolumeAttachment) } + +// matchPodPersistentVolumeNames applies the given matcher to the result of PodVolumeInfo.PersistentVolumeNames(). +func matchPodPersistentVolumeNames(matcher gomegatypes.GomegaMatcher) gomegatypes.GomegaMatcher { + return gcustom.MakeMatcher(func(actual PodVolumeInfo) (bool, error) { + return matcher.Match(actual.PersistentVolumeNames()) + }) +} + +// matchPodVolumeIDs applies the given matcher to the result of PodVolumeInfo.VolumeIDs(). +func matchPodVolumeIDs(matcher gomegatypes.GomegaMatcher) gomegatypes.GomegaMatcher { + return gcustom.MakeMatcher(func(actual PodVolumeInfo) (bool, error) { + return matcher.Match(actual.VolumeIDs()) + }) +} + +func addAll[T runtime.Object](informer cache.SharedIndexInformer, objects ...T) { + GinkgoHelper() + + for _, object := range objects { + Expect(informer.GetStore().Add(object)).NotTo(HaveOccurred()) + } +} + +func appendSuffixToVolumeHandles(pvs []*corev1.PersistentVolume, suffix string) []*corev1.PersistentVolume { + for _, pv := range pvs { + pv.Spec.CSI.VolumeHandle += suffix + } + return pvs +}