Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't track reattachment of provider-unrelated PVs #937

Merged
merged 3 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 116 additions & 71 deletions pkg/util/provider/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"time"

"github.com/Masterminds/semver/v3"
"github.com/gardener/machine-controller-manager/pkg/util/k8sutils"
"github.com/gardener/machine-controller-manager/pkg/util/provider/driver"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
Expand All @@ -49,6 +47,9 @@ import (
policyv1listers "k8s.io/client-go/listers/policy/v1"
policyv1beta1listers "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/klog/v2"

"github.com/gardener/machine-controller-manager/pkg/util/k8sutils"
"github.com/gardener/machine-controller-manager/pkg/util/provider/driver"
)

// Options are configurable options while draining a node before deletion
Expand Down Expand Up @@ -90,11 +91,36 @@ type fatal struct {
string
}

// PodVolumeInfo is the struct used to hold the PersistentVolumeID and volumeID
// for all the PVs attached to the pod
// PodVolumeInfo holds a list of infos about PersistentVolumes referenced by a single pod.
type PodVolumeInfo struct {
persistentVolumeList []string
volumeList []string
// volumes is the list of infos about all PersistentVolumes referenced by a pod via PersistentVolumeClaims.
volumes []VolumeInfo
}

// PersistentVolumeNames returns the names of all PersistentVolumes used by the pod.
func (p PodVolumeInfo) PersistentVolumeNames() []string {
out := make([]string, 0, len(p.volumes))
for _, volume := range p.volumes {
out = append(out, volume.persistentVolumeName)
}
return out
}

// VolumeIDs returns the volume IDs/handles of all PersistentVolumes used by the pod.
func (p PodVolumeInfo) VolumeIDs() []string {
out := make([]string, 0, len(p.volumes))
for _, volume := range p.volumes {
out = append(out, volume.volumeID)
}
return out
}

// VolumeInfo holds relevant information about a PersistentVolume for tracking attachments.
type VolumeInfo struct {
// The name of the PersistentVolume referenced by PersistentVolumeClaim used in a Pod.
persistentVolumeName string
// The volume ID/handle corresponding to the PersistentVolume name as return by Driver.GetVolumeIDs.
volumeID string
}

const (
Expand Down Expand Up @@ -505,40 +531,51 @@ func sortPodsByPriority(pods []*corev1.Pod) {
})
}

// doAccountingOfPvs returns a map with the key as a hash of
// pod-namespace/pod-name and value as a PodVolumeInfo object
func (o *Options) doAccountingOfPvs(ctx context.Context, pods []*corev1.Pod) map[string]PodVolumeInfo {
// getPodVolumeInfos returns information about all PersistentVolumes of which the machine controller needs to track
// attachments when draining a node.
// It filters out shared PVs (used by multiple pods).
// Also, when mapping PersistentVolume names to volume IDs, the driver filters out volumes of types that don't belong
// to the respective cloud provider. E.g., this filters out CSI volumes of unrelated drivers and NFS volumes, etc.
func (o *Options) getPodVolumeInfos(ctx context.Context, pods []*corev1.Pod) map[string]PodVolumeInfo {
var (
pvMap = make(map[string][]string)
podVolumeInfoMap = make(map[string]PodVolumeInfo)
persistentVolumeNamesByPod = make(map[string][]string)
podVolumeInfos = make(map[string]PodVolumeInfo)
)

for _, pod := range pods {
podPVs, _ := o.getPVList(pod)
pvMap[getPodKey(pod)] = podPVs
persistentVolumeNamesByPod[getPodKey(pod)] = o.getPersistentVolumeNamesForPod(pod)
}

// Filter the list of shared PVs
filterSharedPVs(pvMap)
filterSharedPVs(persistentVolumeNamesByPod)

for podKey, persistentVolumeList := range pvMap {
persistentVolumeListDeepCopy := persistentVolumeList
volumeList, err := o.getVolIDsFromDriver(ctx, persistentVolumeList)
if err != nil {
// In case of error, log and skip this set of volumes
klog.Errorf("error getting volume ID from cloud provider. Skipping volumes for pod: %v. Err: %v", podKey, err)
continue
}
for podKey, persistentVolumeNames := range persistentVolumeNamesByPod {
podVolumeInfo := PodVolumeInfo{}

podVolumeInfo := PodVolumeInfo{
persistentVolumeList: persistentVolumeListDeepCopy,
volumeList: volumeList,
for _, persistentVolumeName := range persistentVolumeNames {
volumeID, err := o.getVolumeIDFromDriver(ctx, persistentVolumeName)
if err != nil {
// In case of error, log and skip this set of volumes
klog.Errorf("error getting volume ID from cloud provider. Skipping volume %s for pod: %v. Err: %v", persistentVolumeName, podKey, err)
continue
}

// Only if the driver returns a volume ID for this PV, we want to track its attachment during drain operations.
if volumeID != "" {
podVolumeInfo.volumes = append(podVolumeInfo.volumes, VolumeInfo{
persistentVolumeName: persistentVolumeName,
volumeID: volumeID,
})
} else {
klog.V(4).Infof("Driver did not return a volume ID for volume %s. Skipping provider-unrelated volume for pod %s", persistentVolumeName, podKey)
}
}
podVolumeInfoMap[podKey] = podVolumeInfo

podVolumeInfos[podKey] = podVolumeInfo
}
klog.V(4).Infof("PodVolumeInfoMap = %v", podVolumeInfoMap)
klog.V(4).Infof("PodVolumeInfos: %v", podVolumeInfos)

return podVolumeInfoMap
return podVolumeInfos
}

// filterSharedPVs filters out the PVs that are shared among pods.
Expand Down Expand Up @@ -585,7 +622,7 @@ func (o *Options) evictPodsWithPv(ctx context.Context, attemptEvict bool, pods [
) {
sortPodsByPriority(pods)

podVolumeInfoMap := o.doAccountingOfPvs(ctx, pods)
podVolumeInfoMap := o.getPodVolumeInfos(ctx, pods)

var (
remainingPods []*corev1.Pod
Expand All @@ -597,7 +634,7 @@ func (o *Options) evictPodsWithPv(ctx context.Context, attemptEvict bool, pods [
for i := 0; i < nretries; i++ {
remainingPods, fastTrack = o.evictPodsWithPVInternal(ctx, attemptEvict, pods, podVolumeInfoMap, policyGroupVersion, getPodFn, returnCh)
if fastTrack || len(remainingPods) == 0 {
//Either all pods got evicted or we need to fast track the return (node deletion detected)
// Either all pods got evicted or we need to fast track the return (node deletion detected)
break
}

Expand Down Expand Up @@ -768,7 +805,7 @@ func (o *Options) evictPodsWithPVInternal(
o.checkAndDeleteWorker(volumeAttachmentEventCh)
continue
}
klog.Warningf("Timeout occurred for following volumes to reattach: %v", podVolumeInfo.persistentVolumeList)
klog.Warningf("Timeout occurred for following volumes to reattach: %v", podVolumeInfo.volumes)
}

o.checkAndDeleteWorker(volumeAttachmentEventCh)
Expand All @@ -786,8 +823,9 @@ func (o *Options) evictPodsWithPVInternal(
return retryPods, false
}

func (o *Options) getPVList(pod *corev1.Pod) ([]string, error) {
pvs := []string{}
func (o *Options) getPersistentVolumeNamesForPod(pod *corev1.Pod) []string {
var pvs []string

for i := range pod.Spec.Volumes {
vol := &pod.Spec.Volumes[i]

Expand Down Expand Up @@ -819,17 +857,18 @@ func (o *Options) getPVList(pod *corev1.Pod) ([]string, error) {
}
}
}
return pvs, nil

return pvs
}

func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo, nodeName string) error {
if len(podVolumeInfo.volumeList) == 0 || nodeName == "" {
if len(podVolumeInfo.volumes) == 0 || nodeName == "" {
// If volume or node name is not available, nothing to do. Just log this as warning
klog.Warningf("Node name: %q, list of pod PVs to wait for detach: %v", nodeName, podVolumeInfo.volumeList)
klog.Warningf("Node name: %q, list of pod PVs to wait for detach: %v", nodeName, podVolumeInfo.volumes)
return nil
}

klog.V(4).Info("Waiting for following volumes to detach: ", podVolumeInfo.volumeList)
klog.V(3).Infof("Waiting for following volumes to detach: %v", podVolumeInfo.volumes)

found := true

Expand All @@ -852,15 +891,15 @@ func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo
return err
}

klog.V(4).Infof("No of attached volumes for node %q is %s", nodeName, node.Status.VolumesAttached)
klog.V(4).Infof("Volumes attached to node %q: %s", nodeName, node.Status.VolumesAttached)
attachedVols := node.Status.VolumesAttached
if len(attachedVols) == 0 {
klog.V(4).Infof("No volumes attached to the node %q", nodeName)
return nil
}

LookUpVolume:
for _, volumeID := range podVolumeInfo.volumeList {
for _, volumeID := range podVolumeInfo.VolumeIDs() {

for j := range attachedVols {
attachedVol := &attachedVols[j]
Expand All @@ -869,7 +908,7 @@ func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo

if found {
klog.V(4).Infof(
"Found volume:%s still attached to node %q. Will re-check in %s",
"Found volume %q still attached to node %q. Will re-check in %s",
volumeID,
nodeName,
VolumeDetachPollInterval,
Expand All @@ -881,7 +920,7 @@ func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo
}
}

klog.V(4).Infof("Detached volumes:%s from node %q", podVolumeInfo.volumeList, nodeName)
klog.V(3).Infof("Detached volumes %v from node %q", podVolumeInfo.volumes, nodeName)
timebertt marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand All @@ -899,18 +938,18 @@ func isDesiredReattachment(volumeAttachment *storagev1.VolumeAttachment, previou
// 1. If CSI is enabled use determine reattach
// 2. If all else fails, fallback to static timeout
func (o *Options) waitForReattach(ctx context.Context, podVolumeInfo PodVolumeInfo, previousNodeName string, volumeAttachmentEventCh chan *storagev1.VolumeAttachment) error {
if len(podVolumeInfo.persistentVolumeList) == 0 || previousNodeName == "" {
if len(podVolumeInfo.volumes) == 0 || previousNodeName == "" {
// If volume or node name is not available, nothing to do. Just log this as warning
klog.Warningf("List of pod PVs waiting for reattachment is 0: %v", podVolumeInfo.persistentVolumeList)
klog.Warningf("List of pod PVs waiting for reattachment is 0: %v", podVolumeInfo.volumes)
return nil
}

klog.V(3).Infof("Waiting for following volumes to reattach: %v", podVolumeInfo.persistentVolumeList)
klog.V(3).Infof("Waiting for following volumes to reattach: %v", podVolumeInfo.volumes)

var pvsWaitingForReattachments map[string]bool
if volumeAttachmentEventCh != nil {
pvsWaitingForReattachments = make(map[string]bool)
for _, persistentVolumeName := range podVolumeInfo.persistentVolumeList {
for _, persistentVolumeName := range podVolumeInfo.PersistentVolumeNames() {
pvsWaitingForReattachments[persistentVolumeName] = true
}
}
Expand All @@ -923,7 +962,7 @@ func (o *Options) waitForReattach(ctx context.Context, podVolumeInfo PodVolumeIn

case <-ctx.Done():
// Timeout occurred waiting for reattachment, exit function with error
klog.Warningf("Timeout occurred while waiting for PVs %v to reattach to a different node", podVolumeInfo.persistentVolumeList)
klog.Warningf("Timeout occurred while waiting for PVs %v to reattach to a different node", podVolumeInfo.volumes)
return fmt.Errorf("%s", reattachTimeoutErr)

case incomingEvent := <-volumeAttachmentEventCh:
Expand All @@ -945,39 +984,45 @@ func (o *Options) waitForReattach(ctx context.Context, podVolumeInfo PodVolumeIn
}
}

klog.V(3).Infof("Successfully reattached volumes: %s", podVolumeInfo.persistentVolumeList)
klog.V(3).Infof("Successfully reattached volumes: %s", podVolumeInfo.volumes)
return nil
}

func (o *Options) getVolIDsFromDriver(ctx context.Context, pvNames []string) ([]string, error) {
pvSpecs := []*corev1.PersistentVolumeSpec{}
func (o *Options) getVolumeIDFromDriver(ctx context.Context, pvName string) (string, error) {
var pvSpec *corev1.PersistentVolumeSpec

for _, pvName := range pvNames {
try := 0
try := 0

for {
pv, err := o.pvLister.Get(pvName)
for {
pv, err := o.pvLister.Get(pvName)

if apierrors.IsNotFound(err) {
break
} else if err != nil {
try++
if try == GetPvDetailsMaxRetries {
break
}
// In case of error, try again after few seconds
time.Sleep(GetPvDetailsRetryInterval)
continue
if apierrors.IsNotFound(err) {
return "", nil
} else if err != nil {
try++
if try == GetPvDetailsMaxRetries {
return "", err
}

// Found PV; append and exit
pvSpecs = append(pvSpecs, &pv.Spec)
break
// In case of error, try again after few seconds
time.Sleep(GetPvDetailsRetryInterval)
continue
}

// found PV
pvSpec = &pv.Spec
break
}

response, err := o.Driver.GetVolumeIDs(ctx, &driver.GetVolumeIDsRequest{PVSpecs: []*corev1.PersistentVolumeSpec{pvSpec}})
Copy link
Contributor

@elankath elankath Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: The new implementation can result in excessive calls to Driver.GetVolumeIDs and is highly-dependent on the provider implementation making this cheap. Other can result in rate limit errors at provider. Earlier there was only a single call to Driver.GetVolumeIDs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is true. I knowingly changed this, because existing implementation that I looked into (aws and openstack) didn't call an API on GetVolumeIDs.
I was under the impression that drivers are expected to extract the volume IDs purely from the PV spec without the help of an external API.

If this is not the case, I can restructure the code again to perform a single GetVolumeIDs call and use the list of IDs from that instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elankath can you comment on how the PR should handle this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the godoc for GetVolumeIDs does mention this (though we should fix the grammar). It is fine - you don't need to restructure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not the case, I can restructure the code again to perform a single GetVolumeIDs call and use the list of IDs from that instead.

While thinking about it again, this won't be possible.
The reason is, that the driver filters the list of input volumes depending on whether it recognizes the volume. However, we won't be able to correlate the input volumes and filtered output volumes, which would be required to achieve the needed filtering in the drain logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine - you don't need to restructure.

Ok, great. Then, let's go for merging this PR :)

if err != nil {
return "", err
}

response, err := o.Driver.GetVolumeIDs(ctx, &driver.GetVolumeIDsRequest{PVSpecs: pvSpecs})
return response.VolumeIDs, err
if len(response.VolumeIDs) > 0 {
return response.VolumeIDs[0], nil
}

return "", nil
}

func (o *Options) evictPodWithoutPVInternal(ctx context.Context, attemptEvict bool, pod *corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error), returnCh chan error) {
Expand Down Expand Up @@ -1076,8 +1121,8 @@ func (o *Options) waitForDelete(pods []*corev1.Pod, interval, timeout time.Durat
for i, pod := range pods {
p, err := getPodFn(pod.Namespace, pod.Name)
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
//cmdutil.PrintSuccess(o.mapper, false, o.Out, "pod", pod.Name, false, verbStr)
//klog.Info("pod deleted successfully found")
// cmdutil.PrintSuccess(o.mapper, false, o.Out, "pod", pod.Name, false, verbStr)
// klog.Info("pod deleted successfully found")
continue
} else if err != nil {
return false, err
Expand Down
Loading