Skip to content

Commit

Permalink
bug: checking if claim ref is set before changing reclaim policy.
Browse files Browse the repository at this point in the history
before changing the pv reclaim policy we now check if the pv has a claim
ref set. changing the reclaim policy in an not yet associated pv may
result in volume lost.
  • Loading branch information
ricardomaraschini committed Feb 16, 2023
1 parent 65e2284 commit fb7d619
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 22 deletions.
37 changes: 25 additions & 12 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,15 +592,24 @@ func newPvcName(originalName string) string {
}

// get a PV, apply the selected mutator to the PV, update the PV, use the supplied validator to wait for the update to show up
func mutatePV(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvName string, mutator func(volume *corev1.PersistentVolume) *corev1.PersistentVolume, checker func(volume *corev1.PersistentVolume) bool) error {
func mutatePV(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvName string, mutator func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error), checker func(volume *corev1.PersistentVolume) bool) error {
tries := 0
for {
pv, err := clientset.CoreV1().PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get persistent volumes %s: %w", pvName, err)
}

pv = mutator(pv)
pv, err = mutator(pv)
if err != nil {
if tries > 5 {
return fmt.Errorf("failed to determine new PV %s: %w", pvName, err)
}
w.Printf("Failed to determine new PV %s, waiting 5s to retry\n", pvName)
time.Sleep(time.Second * 5)
tries++
continue
}

_, err = clientset.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{})
if err != nil {
Expand Down Expand Up @@ -943,10 +952,10 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
// mark PVs used by both originalPVC and migratedPVC as to-be-retained
w.Printf("Marking original PV %s as to-be-retained\n", originalPVC.Spec.VolumeName)
var originalReclaim corev1.PersistentVolumeReclaimPolicy
err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
originalReclaim = volume.Spec.PersistentVolumeReclaimPolicy
volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain
return volume
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
return volume.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimRetain
})
Expand All @@ -955,7 +964,7 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
}

w.Printf("Marking migrated-to PV %s as to-be-retained\n", migratedPVC.Spec.VolumeName)
err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
// add annotations describing what PVC this data came from in case of a failure later
volume.Annotations[sourceNsAnnotation] = ns
volume.Annotations[sourcePVCAnnotation] = pvcName
Expand All @@ -964,7 +973,7 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
volume.Annotations[desiredReclaimAnnotation] = string(originalReclaim)

volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain
return volume
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
return volume.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimRetain
})
Expand Down Expand Up @@ -998,19 +1007,19 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,

// remove claimrefs from original and migrated-to PVs
w.Printf("Removing claimref from original PV %s\n", originalPVC.Spec.VolumeName)
err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
volume.Spec.ClaimRef = nil
return volume
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
return volume.Spec.ClaimRef == nil
})
if err != nil {
return fmt.Errorf("failed to remove claimrefs from PV %s: %w", originalPVC.Spec.VolumeName, err)
}
w.Printf("Removing claimref from migrated-to PV %s\n", migratedPVC.Spec.VolumeName)
err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
volume.Spec.ClaimRef = nil
return volume
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
return volume.Spec.ClaimRef == nil
})
Expand Down Expand Up @@ -1089,15 +1098,19 @@ func waitForDeletion(ctx context.Context, clientset k8sclient.Interface, pvcName
// If 'reclaim' is not specified and the annotation does not exist, the reclaim policy will not be updated.
// in either case, the annotation will be removed.
func resetReclaimPolicy(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvName string, reclaim *corev1.PersistentVolumeReclaimPolicy) error {
err := mutatePV(ctx, w, clientset, pvName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
err := mutatePV(ctx, w, clientset, pvName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
if volume.Spec.ClaimRef == nil {
return nil, fmt.Errorf("PV claimRef not set")
}

if reclaim != nil {
volume.Spec.PersistentVolumeReclaimPolicy = *reclaim
} else {
if annotationVal, ok := volume.Annotations[desiredReclaimAnnotation]; ok {
volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimPolicy(annotationVal)
}
}
return volume
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
if reclaim != nil {
return volume.Spec.PersistentVolumeReclaimPolicy == *reclaim
Expand Down
104 changes: 94 additions & 10 deletions pkg/migrate/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestMutatePV(t *testing.T) {
resources []runtime.Object
pvname string
wantErr bool
ttmutator func(volume *corev1.PersistentVolume) *corev1.PersistentVolume
ttmutator func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error)
ttchecker func(volume *corev1.PersistentVolume) bool
validate func(clientset k8sclient.Interface, t *testing.T) error
}{
Expand All @@ -236,9 +236,9 @@ func TestMutatePV(t *testing.T) {
},
},
},
ttmutator: func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
ttmutator: func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain
return volume
return volume, nil
},
ttchecker: func(volume *corev1.PersistentVolume) bool {
return volume.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimRetain
Expand Down Expand Up @@ -1217,18 +1217,55 @@ func Test_swapPVs(t *testing.T) {
sourceScName := "sourceScName"
destScName := "destScName"
tests := []struct {
name string
resources []runtime.Object
wantPVs []corev1.PersistentVolume
wantPVCs []corev1.PersistentVolumeClaim
ns string
pvcName string
wantErr bool
name string
resources []runtime.Object
wantPVs []corev1.PersistentVolume
wantPVCs []corev1.PersistentVolumeClaim
ns string
pvcName string
wantErr bool
backgroundFunc func(context.Context, *log.Logger, k8sclient.Interface)
}{
{
name: "swap one PVC",
ns: "testns",
pvcName: "sourcepvc",
backgroundFunc: func(ctx context.Context, logger *log.Logger, k k8sclient.Interface) {
// watch for the statefulset to be scaled down, and then delete the pod
for {
select {
case <-time.After(time.Second / 100):
// check statefulset, maybe delete pod
pvcs, err := k.CoreV1().PersistentVolumeClaims("testns").List(ctx, metav1.ListOptions{})
if err != nil {
logger.Printf("got listing PVCs: %s", err.Error())
continue
}

for _, pvc := range pvcs.Items {
if pvc.Spec.VolumeName != "" {
logger.Printf("setting pv %s claim ref to pvc %s", pvc.Spec.VolumeName, pvc.Name)
mutatePV(ctx, logger, k, pvc.Spec.VolumeName,
func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
volume.Spec.ClaimRef = &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: pvc.Name,
}
return volume, nil
},
func(volume *corev1.PersistentVolume) bool {
return true
},
)
}
}
case <-ctx.Done():
return
}
}
},
resources: []runtime.Object{
// two PVCs
&corev1.PersistentVolumeClaim{
Expand Down Expand Up @@ -1402,6 +1439,12 @@ func Test_swapPVs(t *testing.T) {
},
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimDelete,
StorageClassName: sourceScName,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
Status: corev1.PersistentVolumeStatus{
Phase: corev1.VolumeBound,
Expand Down Expand Up @@ -1442,6 +1485,11 @@ func Test_swapPVs(t *testing.T) {
req := require.New(t)
clientset := fake.NewSimpleClientset(tt.resources...)
testlog := log.New(testWriter{t: t}, "", 0)

if tt.backgroundFunc != nil {
go tt.backgroundFunc(context.Background(), testlog, clientset)
}

err := swapPVs(context.Background(), testlog, clientset, tt.ns, tt.pvcName)
if tt.wantErr {
req.Error(err)
Expand Down Expand Up @@ -1489,6 +1537,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand All @@ -1507,6 +1561,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimDelete,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand All @@ -1530,6 +1590,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand All @@ -1548,6 +1614,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRetain,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand All @@ -1569,6 +1641,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand All @@ -1586,6 +1664,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand Down

0 comments on commit fb7d619

Please sign in to comment.