Skip to content

Commit

Permalink
Fix DataImportCron PVC timestamping (#2567)
Browse files Browse the repository at this point in the history
Manual backport of #2566.

DataImportCron PVC garbage collection is LRU-based, so it's broken when
PVCs are not timestamped.

Signed-off-by: Arnon Gilboa <[email protected]>
  • Loading branch information
arnongilboa authored Feb 1, 2023
1 parent eb1a896 commit e227635
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 42 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdi

func (r *DataImportCronReconciler) updatePvc(ctx context.Context, cron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim) error {
pvcCopy := pvc.DeepCopy()
AddAnnotation(pvc, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
r.setDataImportCronResourceLabels(cron, pvc)
if !reflect.DeepEqual(pvc, pvcCopy) {
if err := r.client.Update(ctx, pvc); err != nil {
Expand Down Expand Up @@ -559,8 +560,7 @@ func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, d
return err
}
} else {
AddAnnotation(pvc, AnnLastUseTime, time.Now().Format(time.RFC3339Nano))
if err := r.client.Update(ctx, pvc); err != nil {
if err := r.updatePvc(ctx, dataImportCron, pvc); err != nil {
return err
}
}
Expand Down
124 changes: 84 additions & 40 deletions tests/dataimportcron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,14 @@ const (
dataImportCronTimeout = 4 * time.Minute
scheduleEveryMinute = "* * * * *"
scheduleOnceAYear = "0 0 1 1 *"
)

var (
importsToKeep int32 = 1
importsToKeep = 1
)

var _ = Describe("DataImportCron", func() {
var (
f = framework.NewFramework(namespacePrefix)
dataSourceName = "datasource-test"
cronName = "cron-test"
dvName = "dv-garbage"
cron *cdiv1.DataImportCron
ns string
)
Expand All @@ -54,25 +50,30 @@ var _ = Describe("DataImportCron", func() {
}
}

table.DescribeTable("should", func(garbageCollection, retention, createErrorDv bool, repeat int) {
waitForConditions := func(statusProgressing, statusUpToDate corev1.ConditionStatus) {
By(fmt.Sprintf("Wait for DataImportCron Progressing:%s, UpToDate:%s", statusProgressing, statusUpToDate))
Eventually(func() bool {
var err error
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
condProgressing := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronProgressing)
condUpToDate := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronUpToDate)
return condProgressing != nil && condProgressing.Status == statusProgressing &&
condUpToDate != nil && condUpToDate.Status == statusUpToDate
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "Timeout waiting for DataImportCron conditions")
}

table.DescribeTable("should", func(retention, createErrorDv bool, repeat int) {
reg, err := getDataVolumeSourceRegistry(f)
Expect(err).To(BeNil())
defer utils.RemoveInsecureRegistry(f.CrClient, *reg.URL)

By(fmt.Sprintf("Create labeled PVC %s for garbage collection test", dvName))
labels := map[string]string{common.DataImportCronLabel: cronName}
pvc := utils.NewPVCDefinition(dvName, "5Gi", nil, labels)
f.CreateBoundPVCFromDefinition(pvc)

By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)

expectedImports := int(importsToKeep)
if !garbageCollection {
garbageCollect := cdiv1.DataImportCronGarbageCollectNever
cron.Spec.GarbageCollect = &garbageCollect
expectedImports = 2
}
garbageCollect := cdiv1.DataImportCronGarbageCollectNever
cron.Spec.GarbageCollect = &garbageCollect

if !retention {
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
Expand All @@ -90,19 +91,6 @@ var _ = Describe("DataImportCron", func() {
return true
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob was not created")

waitForConditions := func(statusProgressing, statusUpToDate corev1.ConditionStatus) {
By(fmt.Sprintf("Wait for DataImportCron Progressing:%s, UpToDate:%s", statusProgressing, statusUpToDate))
Eventually(func() bool {
var err error
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
condProgressing := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronProgressing)
condUpToDate := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronUpToDate)
return condProgressing != nil && condProgressing.Status == statusProgressing &&
condUpToDate != nil && condUpToDate.Status == statusUpToDate
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "Timeout waiting for DataImportCron conditions")
}

var lastImportDv, currentImportDv string
for i := 0; i < repeat; i++ {
By(fmt.Sprintf("Iter #%d", i))
Expand Down Expand Up @@ -208,12 +196,6 @@ var _ = Describe("DataImportCron", func() {
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
}
By("Check garbage collection")
Eventually(func() int {
pvcList, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
return len(pvcList.Items)
}, dataImportCronTimeout, pollingInterval).Should(Equal(expectedImports), "Garbage collection failed cleaning old imports")

lastImportedPVC := cron.Status.LastImportedPVC

Expand Down Expand Up @@ -244,12 +226,74 @@ var _ = Describe("DataImportCron", func() {
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "PVCs were not deleted")
}
},
table.Entry("[test_id:7403] succeed importing initial PVC from registry URL", false, true, false, 1),
table.Entry("[test_id:7414] succeed importing PVC from registry URL on source digest update", false, true, false, 2),
table.Entry("[test_id:7406] succeed garbage collecting old PVCs when importing new ones", true, false, false, 2),
table.Entry("[test_id:8266] succeed deleting error DVs when importing new ones", true, false, true, 2),
table.Entry("[test_id:7403] succeed importing initial PVC from registry URL", true, false, 1),
table.Entry("[test_id:7414] succeed importing PVC from registry URL on source digest update", true, false, 2),
table.Entry("[test_id:8266] succeed deleting error DVs when importing new ones", false, true, 2),
)

It("[test_id:7406] succeed garbage collecting old PVCs when importing new ones", func() {
reg, err := getDataVolumeSourceRegistry(f)
Expect(err).To(BeNil())
defer utils.RemoveInsecureRegistry(f.CrClient, *reg.URL)

garbagePVCs := 3
for i := 0; i < garbagePVCs; i++ {
pvcName := fmt.Sprintf("pvc-garbage-%d", i)
By(fmt.Sprintf("Create %s", pvcName))
pvc := utils.NewPVCDefinition(pvcName, "1Gi",
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
map[string]string{common.DataImportCronLabel: cronName})
f.CreateBoundPVCFromDefinition(pvc)
}

pvcList, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(pvcList.Items).To(HaveLen(garbagePVCs))

By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "1Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy

cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())

waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())

By(fmt.Sprintf("Verify pvc was created %s", currentImportDv))
currentPvc, err := utils.WaitForPVC(f.K8sClient, ns, currentImportDv)
Expect(err).ToNot(HaveOccurred())

By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")

By("Check garbage collection")
Eventually(func() int {
pvcList, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
return len(pvcList.Items)
}, dataImportCronTimeout, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")

By("Check last import PVC is timestamped and not garbage collected")
pvcFound := false
for _, pvc := range pvcList.Items {
if pvc.UID == currentPvc.UID {
lastUse := pvc.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
pvcFound = true
break
}
}
Expect(pvcFound).To(BeTrue())
})

It("[test_id:8033] should delete jobs on deletion", func() {
reg, err := getDataVolumeSourceRegistry(f)
Expect(err).To(BeNil())
Expand Down

0 comments on commit e227635

Please sign in to comment.