Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DataImportCron PVC timestamping #2566

Merged
merged 1 commit into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdi

func (r *DataImportCronReconciler) updatePvc(ctx context.Context, cron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim) error {
pvcCopy := pvc.DeepCopy()
cc.AddAnnotation(pvc, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
r.setDataImportCronResourceLabels(cron, pvc)
if !reflect.DeepEqual(pvc, pvcCopy) {
if err := r.client.Update(ctx, pvc); err != nil {
Expand Down Expand Up @@ -567,8 +568,7 @@ func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, d
return err
}
} else {
cc.AddAnnotation(pvc, AnnLastUseTime, time.Now().Format(time.RFC3339Nano))
if err := r.client.Update(ctx, pvc); err != nil {
if err := r.updatePvc(ctx, dataImportCron, pvc); err != nil {
return err
}
}
Expand Down
124 changes: 84 additions & 40 deletions tests/dataimportcron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,14 @@ const (
dataImportCronTimeout = 4 * time.Minute
scheduleEveryMinute = "* * * * *"
scheduleOnceAYear = "0 0 1 1 *"
)

var (
importsToKeep int32 = 1
importsToKeep = 1
)

var _ = Describe("DataImportCron", func() {
var (
f = framework.NewFramework(namespacePrefix)
dataSourceName = "datasource-test"
cronName = "cron-test"
dvName = "dv-garbage"
cron *cdiv1.DataImportCron
ns string
)
Expand All @@ -54,25 +50,30 @@ var _ = Describe("DataImportCron", func() {
}
}

table.DescribeTable("should", func(garbageCollection, retention, createErrorDv bool, repeat int) {
waitForConditions := func(statusProgressing, statusUpToDate corev1.ConditionStatus) {
By(fmt.Sprintf("Wait for DataImportCron Progressing:%s, UpToDate:%s", statusProgressing, statusUpToDate))
Eventually(func() bool {
var err error
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
condProgressing := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronProgressing)
condUpToDate := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronUpToDate)
return condProgressing != nil && condProgressing.Status == statusProgressing &&
condUpToDate != nil && condUpToDate.Status == statusUpToDate
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "Timeout waiting for DataImportCron conditions")
}

table.DescribeTable("should", func(retention, createErrorDv bool, repeat int) {
reg, err := getDataVolumeSourceRegistry(f)
Expect(err).To(BeNil())
defer utils.RemoveInsecureRegistry(f.CrClient, *reg.URL)

By(fmt.Sprintf("Create labeled PVC %s for garbage collection test", dvName))
labels := map[string]string{common.DataImportCronLabel: cronName}
pvc := utils.NewPVCDefinition(dvName, "5Gi", nil, labels)
f.CreateBoundPVCFromDefinition(pvc)

By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)

expectedImports := int(importsToKeep)
if !garbageCollection {
garbageCollect := cdiv1.DataImportCronGarbageCollectNever
cron.Spec.GarbageCollect = &garbageCollect
expectedImports = 2
}
garbageCollect := cdiv1.DataImportCronGarbageCollectNever
cron.Spec.GarbageCollect = &garbageCollect

if !retention {
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
Expand All @@ -90,19 +91,6 @@ var _ = Describe("DataImportCron", func() {
return true
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob was not created")

waitForConditions := func(statusProgressing, statusUpToDate corev1.ConditionStatus) {
By(fmt.Sprintf("Wait for DataImportCron Progressing:%s, UpToDate:%s", statusProgressing, statusUpToDate))
Eventually(func() bool {
var err error
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
condProgressing := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronProgressing)
condUpToDate := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronUpToDate)
return condProgressing != nil && condProgressing.Status == statusProgressing &&
condUpToDate != nil && condUpToDate.Status == statusUpToDate
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "Timeout waiting for DataImportCron conditions")
}

var lastImportDv, currentImportDv string
for i := 0; i < repeat; i++ {
By(fmt.Sprintf("Iter #%d", i))
Expand Down Expand Up @@ -208,12 +196,6 @@ var _ = Describe("DataImportCron", func() {
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
}
By("Check garbage collection")
Eventually(func() int {
pvcList, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
return len(pvcList.Items)
}, dataImportCronTimeout, pollingInterval).Should(Equal(expectedImports), "Garbage collection failed cleaning old imports")

lastImportedPVC := cron.Status.LastImportedPVC

Expand Down Expand Up @@ -244,12 +226,74 @@ var _ = Describe("DataImportCron", func() {
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "PVCs were not deleted")
}
},
table.Entry("[test_id:7403] succeed importing initial PVC from registry URL", false, true, false, 1),
table.Entry("[test_id:7414] succeed importing PVC from registry URL on source digest update", false, true, false, 2),
table.Entry("[test_id:7406] succeed garbage collecting old PVCs when importing new ones", true, false, false, 2),
table.Entry("[test_id:8266] succeed deleting error DVs when importing new ones", true, false, true, 2),
table.Entry("[test_id:7403] succeed importing initial PVC from registry URL", true, false, 1),
table.Entry("[test_id:7414] succeed importing PVC from registry URL on source digest update", true, false, 2),
table.Entry("[test_id:8266] succeed deleting error DVs when importing new ones", false, true, 2),
)

It("[test_id:7406] succeed garbage collecting old PVCs when importing new ones", func() {
reg, err := getDataVolumeSourceRegistry(f)
Expect(err).To(BeNil())
defer utils.RemoveInsecureRegistry(f.CrClient, *reg.URL)

garbagePVCs := 3
for i := 0; i < garbagePVCs; i++ {
pvcName := fmt.Sprintf("pvc-garbage-%d", i)
By(fmt.Sprintf("Create %s", pvcName))
pvc := utils.NewPVCDefinition(pvcName, "1Gi",
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
map[string]string{common.DataImportCronLabel: cronName})
f.CreateBoundPVCFromDefinition(pvc)
}

pvcList, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(pvcList.Items).To(HaveLen(garbagePVCs))

By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "1Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy

cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())

waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())

By(fmt.Sprintf("Verify pvc was created %s", currentImportDv))
currentPvc, err := utils.WaitForPVC(f.K8sClient, ns, currentImportDv)
Expect(err).ToNot(HaveOccurred())

By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")

By("Check garbage collection")
Eventually(func() int {
pvcList, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
return len(pvcList.Items)
}, 10*time.Second, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")

By("Check last import PVC is timestamped and not garbage collected")
pvcFound := false
for _, pvc := range pvcList.Items {
if pvc.UID == currentPvc.UID {
lastUse := pvc.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
pvcFound = true
break
}
}
Expect(pvcFound).To(BeTrue())
})

It("[test_id:8033] should delete jobs on deletion", func() {
reg, err := getDataVolumeSourceRegistry(f)
Expect(err).To(BeNil())
Expand Down