From fb7d619b9b2389a8a0deeac24651acec1a13f718 Mon Sep 17 00:00:00 2001 From: Ricardo Maraschini Date: Thu, 16 Feb 2023 22:29:04 +0100 Subject: [PATCH] bug: checking if claim ref is set before changing reclaim policy. before changing the pv reclaim policy we now check if the pv has a claim ref set. changing the reclaim policy in an not yet associated pv may result in volume lost. --- pkg/migrate/migrate.go | 37 ++++++++----- pkg/migrate/migrate_test.go | 104 ++++++++++++++++++++++++++++++++---- 2 files changed, 119 insertions(+), 22 deletions(-) diff --git a/pkg/migrate/migrate.go b/pkg/migrate/migrate.go index 375379b..9df1de8 100644 --- a/pkg/migrate/migrate.go +++ b/pkg/migrate/migrate.go @@ -592,7 +592,7 @@ func newPvcName(originalName string) string { } // get a PV, apply the selected mutator to the PV, update the PV, use the supplied validator to wait for the update to show up -func mutatePV(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvName string, mutator func(volume *corev1.PersistentVolume) *corev1.PersistentVolume, checker func(volume *corev1.PersistentVolume) bool) error { +func mutatePV(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvName string, mutator func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error), checker func(volume *corev1.PersistentVolume) bool) error { tries := 0 for { pv, err := clientset.CoreV1().PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{}) @@ -600,7 +600,16 @@ func mutatePV(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, return fmt.Errorf("failed to get persistent volumes %s: %w", pvName, err) } - pv = mutator(pv) + pv, err = mutator(pv) + if err != nil { + if tries > 5 { + return fmt.Errorf("failed to determine new PV %s: %w", pvName, err) + } + w.Printf("Failed to determine new PV %s, waiting 5s to retry\n", pvName) + time.Sleep(time.Second * 5) + tries++ + continue + } _, err = clientset.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{}) if err != nil { @@ -943,10 +952,10 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, // mark PVs used by both originalPVC and migratedPVC as to-be-retained w.Printf("Marking original PV %s as to-be-retained\n", originalPVC.Spec.VolumeName) var originalReclaim corev1.PersistentVolumeReclaimPolicy - err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume { + err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) { originalReclaim = volume.Spec.PersistentVolumeReclaimPolicy volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain - return volume + return volume, nil }, func(volume *corev1.PersistentVolume) bool { return volume.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimRetain }) @@ -955,7 +964,7 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, } w.Printf("Marking migrated-to PV %s as to-be-retained\n", migratedPVC.Spec.VolumeName) - err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume { + err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) { // add annotations describing what PVC this data came from in case of a failure later volume.Annotations[sourceNsAnnotation] = ns volume.Annotations[sourcePVCAnnotation] = pvcName @@ -964,7 +973,7 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, volume.Annotations[desiredReclaimAnnotation] = string(originalReclaim) volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain - return volume + return volume, nil }, func(volume *corev1.PersistentVolume) bool { return volume.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimRetain }) @@ -998,9 +1007,9 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, // remove claimrefs from original and migrated-to PVs w.Printf("Removing claimref from original PV %s\n", originalPVC.Spec.VolumeName) - err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume { + err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) { volume.Spec.ClaimRef = nil - return volume + return volume, nil }, func(volume *corev1.PersistentVolume) bool { return volume.Spec.ClaimRef == nil }) @@ -1008,9 +1017,9 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, return fmt.Errorf("failed to remove claimrefs from PV %s: %w", originalPVC.Spec.VolumeName, err) } w.Printf("Removing claimref from migrated-to PV %s\n", migratedPVC.Spec.VolumeName) - err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume { + err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) { volume.Spec.ClaimRef = nil - return volume + return volume, nil }, func(volume *corev1.PersistentVolume) bool { return volume.Spec.ClaimRef == nil }) @@ -1089,7 +1098,11 @@ func waitForDeletion(ctx context.Context, clientset k8sclient.Interface, pvcName // If 'reclaim' is not specified and the annotation does not exist, the reclaim policy will not be updated. // in either case, the annotation will be removed. func resetReclaimPolicy(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvName string, reclaim *corev1.PersistentVolumeReclaimPolicy) error { - err := mutatePV(ctx, w, clientset, pvName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume { + err := mutatePV(ctx, w, clientset, pvName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) { + if volume.Spec.ClaimRef == nil { + return nil, fmt.Errorf("PV claimRef not set") + } + if reclaim != nil { volume.Spec.PersistentVolumeReclaimPolicy = *reclaim } else { @@ -1097,7 +1110,7 @@ func resetReclaimPolicy(ctx context.Context, w *log.Logger, clientset k8sclient. volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimPolicy(annotationVal) } } - return volume + return volume, nil }, func(volume *corev1.PersistentVolume) bool { if reclaim != nil { return volume.Spec.PersistentVolumeReclaimPolicy == *reclaim diff --git a/pkg/migrate/migrate_test.go b/pkg/migrate/migrate_test.go index 886ee31..7ac5a14 100644 --- a/pkg/migrate/migrate_test.go +++ b/pkg/migrate/migrate_test.go @@ -217,7 +217,7 @@ func TestMutatePV(t *testing.T) { resources []runtime.Object pvname string wantErr bool - ttmutator func(volume *corev1.PersistentVolume) *corev1.PersistentVolume + ttmutator func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) ttchecker func(volume *corev1.PersistentVolume) bool validate func(clientset k8sclient.Interface, t *testing.T) error }{ @@ -236,9 +236,9 @@ func TestMutatePV(t *testing.T) { }, }, }, - ttmutator: func(volume *corev1.PersistentVolume) *corev1.PersistentVolume { + ttmutator: func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) { volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain - return volume + return volume, nil }, ttchecker: func(volume *corev1.PersistentVolume) bool { return volume.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimRetain @@ -1217,18 +1217,55 @@ func Test_swapPVs(t *testing.T) { sourceScName := "sourceScName" destScName := "destScName" tests := []struct { - name string - resources []runtime.Object - wantPVs []corev1.PersistentVolume - wantPVCs []corev1.PersistentVolumeClaim - ns string - pvcName string - wantErr bool + name string + resources []runtime.Object + wantPVs []corev1.PersistentVolume + wantPVCs []corev1.PersistentVolumeClaim + ns string + pvcName string + wantErr bool + backgroundFunc func(context.Context, *log.Logger, k8sclient.Interface) }{ { name: "swap one PVC", ns: "testns", pvcName: "sourcepvc", + backgroundFunc: func(ctx context.Context, logger *log.Logger, k k8sclient.Interface) { + // watch for the statefulset to be scaled down, and then delete the pod + for { + select { + case <-time.After(time.Second / 100): + // check statefulset, maybe delete pod + pvcs, err := k.CoreV1().PersistentVolumeClaims("testns").List(ctx, metav1.ListOptions{}) + if err != nil { + logger.Printf("got listing PVCs: %s", err.Error()) + continue + } + + for _, pvc := range pvcs.Items { + if pvc.Spec.VolumeName != "" { + logger.Printf("setting pv %s claim ref to pvc %s", pvc.Spec.VolumeName, pvc.Name) + mutatePV(ctx, logger, k, pvc.Spec.VolumeName, + func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) { + volume.Spec.ClaimRef = &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + Namespace: "testns", + Name: pvc.Name, + } + return volume, nil + }, + func(volume *corev1.PersistentVolume) bool { + return true + }, + ) + } + } + case <-ctx.Done(): + return + } + } + }, resources: []runtime.Object{ // two PVCs &corev1.PersistentVolumeClaim{ @@ -1402,6 +1439,12 @@ func Test_swapPVs(t *testing.T) { }, PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimDelete, StorageClassName: sourceScName, + ClaimRef: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + Namespace: "testns", + Name: "sourcepvc", + }, }, Status: corev1.PersistentVolumeStatus{ Phase: corev1.VolumeBound, @@ -1442,6 +1485,11 @@ func Test_swapPVs(t *testing.T) { req := require.New(t) clientset := fake.NewSimpleClientset(tt.resources...) testlog := log.New(testWriter{t: t}, "", 0) + + if tt.backgroundFunc != nil { + go tt.backgroundFunc(context.Background(), testlog, clientset) + } + err := swapPVs(context.Background(), testlog, clientset, tt.ns, tt.pvcName) if tt.wantErr { req.Error(err) @@ -1489,6 +1537,12 @@ func Test_resetReclaimPolicy(t *testing.T) { }, Spec: corev1.PersistentVolumeSpec{ PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle, + ClaimRef: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + Namespace: "testns", + Name: "sourcepvc", + }, }, }, }, @@ -1507,6 +1561,12 @@ func Test_resetReclaimPolicy(t *testing.T) { }, Spec: corev1.PersistentVolumeSpec{ PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimDelete, + ClaimRef: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + Namespace: "testns", + Name: "sourcepvc", + }, }, }, }, @@ -1530,6 +1590,12 @@ func Test_resetReclaimPolicy(t *testing.T) { }, Spec: corev1.PersistentVolumeSpec{ PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle, + ClaimRef: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + Namespace: "testns", + Name: "sourcepvc", + }, }, }, }, @@ -1548,6 +1614,12 @@ func Test_resetReclaimPolicy(t *testing.T) { }, Spec: corev1.PersistentVolumeSpec{ PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRetain, + ClaimRef: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + Namespace: "testns", + Name: "sourcepvc", + }, }, }, }, @@ -1569,6 +1641,12 @@ func Test_resetReclaimPolicy(t *testing.T) { }, Spec: corev1.PersistentVolumeSpec{ PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle, + ClaimRef: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + Namespace: "testns", + Name: "sourcepvc", + }, }, }, }, @@ -1586,6 +1664,12 @@ func Test_resetReclaimPolicy(t *testing.T) { }, Spec: corev1.PersistentVolumeSpec{ PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle, + ClaimRef: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + Namespace: "testns", + Name: "sourcepvc", + }, }, }, },