Skip to content

Commit

Permalink
[release-v1.49] Add DataImportCron CronJobs Proxy support (#2462)
Browse files Browse the repository at this point in the history
Manual backport of #2455

Signed-off-by: Arnon Gilboa <[email protected]>

Signed-off-by: Arnon Gilboa <[email protected]>
  • Loading branch information
arnongilboa authored Nov 10, 2022
1 parent b855819 commit c2d8871
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 138 deletions.
2 changes: 2 additions & 0 deletions pkg/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_library(
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/api:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/pkg/sdk:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/cache:go_default_library",
Expand Down Expand Up @@ -130,6 +131,7 @@ go_test(
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/cluster-bootstrap/token/api:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
"//vendor/kubevirt.io/controller-lifecycle-operator-sdk/api:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/client:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/client/fake:go_default_library",
Expand Down
64 changes: 33 additions & 31 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -774,14 +775,25 @@ func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batc
TerminationMessagePolicy: corev1.TerminationMessageReadFile,
}

volumes := []corev1.Volume{}
hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
if hasCertConfigMap {
vm := corev1.VolumeMount{
Name: CertVolName,
MountPath: common.ImporterCertDir,
}
container.VolumeMounts = []corev1.VolumeMount{vm}
container.VolumeMounts = append(container.VolumeMounts, vm)
container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
}

if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
vm := corev1.VolumeMount{
Name: ProxyCertVolName,
MountPath: common.ImporterProxyCertDir,
}
container.VolumeMounts = append(container.VolumeMounts, vm)
volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
}

if regSource.SecretRef != nil && *regSource.SecretRef != "" {
Expand Down Expand Up @@ -811,20 +823,25 @@ func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batc
)
}

addEnvVar := func(varName, value string) {
container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
}

if insecureTLS {
container.Env = append(container.Env,
corev1.EnvVar{
Name: common.InsecureTLSVar,
Value: "true",
},
)
addEnvVar(common.InsecureTLSVar, "true")
}

successfulJobsHistoryLimit := int32(0)
failedJobsHistoryLimit := int32(0)
ttlSecondsAfterFinished := int32(0)
backoffLimit := int32(2)
gracePeriodSeconds := int64(0)
addEnvVarFromImportProxyConfig := func(varName string) {
if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
addEnvVar(varName, value)
} else {
r.log.Info("Missing", varName, err.Error())
}
}

addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)

cronJobName := GetCronJobName(cron)
cronJob := &batchv1.CronJob{
Expand All @@ -835,39 +852,24 @@ func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batc
Spec: batchv1.CronJobSpec{
Schedule: cron.Spec.Schedule,
ConcurrencyPolicy: batchv1.ForbidConcurrent,
SuccessfulJobsHistoryLimit: &successfulJobsHistoryLimit,
FailedJobsHistoryLimit: &failedJobsHistoryLimit,
SuccessfulJobsHistoryLimit: pointer.Int32(1),
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
TerminationGracePeriodSeconds: &gracePeriodSeconds,
TerminationGracePeriodSeconds: pointer.Int64(0),
Containers: []corev1.Container{container},
ServiceAccountName: "cdi-cronjob",
Volumes: volumes,
},
},
TTLSecondsAfterFinished: &ttlSecondsAfterFinished,
BackoffLimit: &backoffLimit,
BackoffLimit: pointer.Int32(2),
},
},
},
}

if hasCertConfigMap {
vol := corev1.Volume{
Name: CertVolName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: *regSource.CertConfigMap,
},
},
},
}
cronJob.Spec.JobTemplate.Spec.Template.Spec.Volumes = []corev1.Volume{vol}
}

if err := r.setJobCommon(cron, cronJob); err != nil {
return nil, err
}
Expand Down
89 changes: 85 additions & 4 deletions pkg/controller/dataimportcron-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -47,8 +48,12 @@ import (
)

var (
cronLog = logf.Log.WithName("data-import-cron-controller-test")
cronName = "test-cron"
cronLog = logf.Log.WithName("data-import-cron-controller-test")
cronName = "test-cron"
httpProxy = "test-http-proxy"
httpsProxy = "test-https-proxy"
noProxy = "test-no-proxy"
trustedCAProxy = "test-trusted-ca-proxy"
)

const (
Expand Down Expand Up @@ -178,14 +183,49 @@ var _ = Describe("All DataImportCron Tests", func() {
})

It("Should create and delete CronJob if DataImportCron is created and deleted", func() {
cdiConfig := MakeEmptyCDIConfigSpec(common.ConfigName)
cdiConfig.Status.ImportProxy = &cdiv1.ImportProxy{
HTTPProxy: &httpProxy,
HTTPSProxy: &httpsProxy,
NoProxy: &noProxy,
TrustedCAProxy: &trustedCAProxy,
}

cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)
reconciler = createDataImportCronReconcilerWithoutConfig(cron, cdiConfig)
_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

cronjob := &batchv1.CronJob{}
err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(err).ToNot(HaveOccurred())
containers := cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers
Expect(containers).To(HaveLen(1))

env := containers[0].Env
Expect(getEnvVar(env, common.ImportProxyHTTP)).To(Equal(httpProxy))
Expect(getEnvVar(env, common.ImportProxyHTTPS)).To(Equal(httpsProxy))
Expect(getEnvVar(env, common.ImportProxyNoProxy)).To(Equal(noProxy))

volMounts := containers[0].VolumeMounts
Expect(volMounts).To(HaveLen(1))
Expect(volMounts[0]).To(Equal(corev1.VolumeMount{
Name: ProxyCertVolName,
MountPath: common.ImporterProxyCertDir,
}))

volumes := cronjob.Spec.JobTemplate.Spec.Template.Spec.Volumes
Expect(volumes).To(HaveLen(1))
Expect(volumes[0]).To(Equal(corev1.Volume{
Name: ProxyCertVolName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: trustedCAProxy,
},
},
},
}))

err = reconciler.client.Get(context.TODO(), cronKey, cron)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -202,6 +242,32 @@ var _ = Describe("All DataImportCron Tests", func() {
Expect(err).To(HaveOccurred())
})

It("Should verify CronJob container env variables are empty and no extra volume is set when proxy is not configured", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)
_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())

cronjob := &batchv1.CronJob{}
err = reconciler.client.Get(context.TODO(), cronJobKey(cron), cronjob)
Expect(err).ToNot(HaveOccurred())

Expect(cronjob.Spec.SuccessfulJobsHistoryLimit).To(Equal(pointer.Int32(1)))
Expect(cronjob.Spec.FailedJobsHistoryLimit).To(BeNil())

jobTemplateSpec := cronjob.Spec.JobTemplate.Spec.Template.Spec
containers := jobTemplateSpec.Containers
Expect(containers).To(HaveLen(1))

env := containers[0].Env
Expect(getEnvVar(env, common.ImportProxyHTTP)).To(BeEmpty())
Expect(getEnvVar(env, common.ImportProxyHTTPS)).To(BeEmpty())
Expect(getEnvVar(env, common.ImportProxyNoProxy)).To(BeEmpty())

Expect(containers[0].VolumeMounts).To(HaveLen(0))
Expect(jobTemplateSpec.Volumes).To(HaveLen(0))
})

It("Should update CronJob on reconcile", func() {
cron = newDataImportCron(cronName)
reconciler = createDataImportCronReconciler(cron)
Expand Down Expand Up @@ -469,8 +535,14 @@ var _ = Describe("untagURL", func() {

func createDataImportCronReconciler(objects ...runtime.Object) *DataImportCronReconciler {
cdiConfig := MakeEmptyCDIConfigSpec(common.ConfigName)
objs := []runtime.Object{cdiConfig}
objs = append(objs, objects...)
return createDataImportCronReconcilerWithoutConfig(objs...)
}

func createDataImportCronReconcilerWithoutConfig(objects ...runtime.Object) *DataImportCronReconciler {
crd := &extv1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: "dataimportcrons.cdi.kubevirt.io"}}
objs := []runtime.Object{cdiConfig, crd}
objs := []runtime.Object{crd}
objs = append(objs, objects...)

s := scheme.Scheme
Expand Down Expand Up @@ -569,3 +641,12 @@ func verifyConditionState(condType string, condState cdiv1.ConditionState, desir
Expect(condState.Status).To(Equal(desiredStatus))
Expect(condState.Reason).To(Equal(desiredReason))
}

func getEnvVar(env []corev1.EnvVar, name string) string {
for _, envVar := range env {
if envVar.Name == name {
return envVar.Value
}
}
return ""
}
18 changes: 3 additions & 15 deletions pkg/controller/import-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,20 +1164,8 @@ func makeImporterPodSpec(args *importerPodArgs) *corev1.Pod {
Name: CertVolName,
MountPath: common.ImporterCertDir,
}

vol := corev1.Volume{
Name: CertVolName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: args.podEnvVar.certConfigMap,
},
},
},
}

pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, vm)
pod.Spec.Volumes = append(pod.Spec.Volumes, vol)
pod.Spec.Volumes = append(pod.Spec.Volumes, createConfigMapVolume(CertVolName, args.podEnvVar.certConfigMap))
}

if args.podEnvVar.certConfigMapProxy != "" {
Expand All @@ -1186,7 +1174,7 @@ func makeImporterPodSpec(args *importerPodArgs) *corev1.Pod {
MountPath: common.ImporterProxyCertDir,
}
pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, vm)
pod.Spec.Volumes = append(pod.Spec.Volumes, createProxyConfigMapVolume(ProxyCertVolName, args.podEnvVar.certConfigMapProxy))
pod.Spec.Volumes = append(pod.Spec.Volumes, createConfigMapVolume(ProxyCertVolName, args.podEnvVar.certConfigMapProxy))
}

for index, header := range args.podEnvVar.secretExtraHeaders {
Expand Down Expand Up @@ -1261,7 +1249,7 @@ func makeImporterContainerSpec(image, verbose, pullPolicy string) *corev1.Contai
}
}

func createProxyConfigMapVolume(certVolName, objRef string) corev1.Volume {
func createConfigMapVolume(certVolName, objRef string) corev1.Volume {
return corev1.Volume{
Name: certVolName,
VolumeSource: corev1.VolumeSource{
Expand Down
51 changes: 28 additions & 23 deletions pkg/importer/registry-datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package importer

import (
"fmt"
"io/ioutil"
"net/url"
"os"
Expand Down Expand Up @@ -54,7 +53,7 @@ type RegistryDataSource struct {

// NewRegistryDataSource creates a new instance of the Registry Data Source.
func NewRegistryDataSource(endpoint, accessKey, secKey, certDir string, insecureTLS bool) *RegistryDataSource {
allCertDir, err := createCertificateDir(certDir)
allCertDir, err := CreateCertificateDir(certDir)
if err != nil {
if allCertDir != "/" {
err = os.RemoveAll(allCertDir)
Expand Down Expand Up @@ -163,33 +162,39 @@ func getImageFileName(dir string) (string, error) {
return filename, nil
}

func createCertificateDir(registryCertDir string) (string, error) {
allCerts := "/all_certs"
err := os.MkdirAll(allCerts, 0777)
if err != nil {
// CreateCertificateDir creates a common certificate dir
func CreateCertificateDir(registryCertDir string) (string, error) {
allCerts := "/tmp/all_certs"
if err := os.MkdirAll(allCerts, 0700); err != nil {
return allCerts, err
}
klog.Info("Copying proxy certs")
directory, _ := os.Open(common.ImporterProxyCertDir)
objects, err := directory.Readdir(-1)
for _, obj := range objects {
if strings.HasSuffix(obj.Name(), ".crt") {
err = util.LinkFile(filepath.Join(common.ImporterProxyCertDir, obj.Name()), filepath.Join(allCerts, fmt.Sprintf("proxy-%s", obj.Name())))
if err != nil {
return allCerts, err
}
}
if err := collectCerts(common.ImporterProxyCertDir, allCerts, "proxy-"); err != nil {
return allCerts, err
}
klog.Info("Copying registry certs")
directory, _ = os.Open(registryCertDir)
objects, err = directory.Readdir(-1)
if err := collectCerts(registryCertDir, allCerts, ""); err != nil {
return allCerts, err
}
return allCerts, nil
}

func collectCerts(certDir, targetDir, targetPrefix string) error {
directory, err := os.Open(certDir)
if err != nil {
return err
}
objects, err := directory.Readdir(-1)
if err != nil {
return err
}
for _, obj := range objects {
if strings.HasSuffix(obj.Name(), ".crt") {
err = util.LinkFile(filepath.Join(registryCertDir, obj.Name()), filepath.Join(allCerts, obj.Name()))
if err != nil {
return allCerts, err
}
if !strings.HasSuffix(obj.Name(), ".crt") {
continue
}
if err := util.LinkFile(filepath.Join(certDir, obj.Name()), filepath.Join(targetDir, targetPrefix+obj.Name())); err != nil {
return err
}
}
return allCerts, nil
return nil
}
Loading

0 comments on commit c2d8871

Please sign in to comment.