From 605772dc20ba8602fc2e516083cadf048f040a3e Mon Sep 17 00:00:00 2001 From: Carolyn Van Slyck Date: Wed, 27 Jan 2021 16:59:42 -0600 Subject: [PATCH 1/7] Support outputs in the kubernetes driver Add the JOB_VOLUME_NAME setting, which is the name of the PVC to mount to /cnab/app/outputs so that outputs from the bundle can be persisted. JOB_VOLUME_PATH specifies where the volume is mounted locally. When the bundle defines outputs, but does not specify a PVC, the driver returns an error. Signed-off-by: Carolyn Van Slyck --- Makefile | 2 +- azure-pipelines.yml | 2 +- driver/kubernetes/kubernetes.go | 102 +++++++++++++++++++++++---- driver/kubernetes/kubernetes_test.go | 57 +++++++++++++++ e2e-kind.sh | 2 +- 5 files changed, 150 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index babf9132..566f32e5 100644 --- a/Makefile +++ b/Makefile @@ -40,7 +40,7 @@ ifndef HAS_GOLANGCI curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(GOPATH)/bin $(GOLANGCI_VERSION) endif ifndef HAS_KIND - go get sigs.k8s.io/kind@v0.6.0 + go get sigs.k8s.io/kind@v0.10.0 endif ifndef HAS_KUBECTL echo "Follow instructions at https://kubernetes.io/docs/tasks/tools/install-kubectl/ to install kubectl." diff --git a/azure-pipelines.yml b/azure-pipelines.yml index cba5cdbb..cecfee57 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -17,7 +17,7 @@ steps: set -xeuo pipefail go env go mod download - go get sigs.k8s.io/kind@v0.6.0 + go get sigs.k8s.io/kind@v0.10.0 sudo make bootstrap make fetch-schemas build lint coverage GOOS=windows make build diff --git a/driver/kubernetes/kubernetes.go b/driver/kubernetes/kubernetes.go index 2a0d2514..023b3b5b 100644 --- a/driver/kubernetes/kubernetes.go +++ b/driver/kubernetes/kubernetes.go @@ -3,8 +3,10 @@ package kubernetes import ( "fmt" "io" + "io/ioutil" "log" "os" + "path" "path/filepath" "regexp" "strconv" @@ -12,6 +14,7 @@ import ( "time" "github.com/docker/distribution/reference" + "github.com/hashicorp/go-multierror" "github.com/opencontainers/go-digest" "github.com/pkg/errors" batchv1 "k8s.io/api/batch/v1" @@ -49,6 +52,8 @@ type Driver struct { Annotations map[string]string LimitCPU resource.Quantity LimitMemory resource.Quantity + JobVolumePath string + JobVolumeName string Tolerations []v1.Toleration ActiveDeadlineSeconds int64 BackoffLimit int32 @@ -82,6 +87,8 @@ func (k *Driver) Config() map[string]string { return map[string]string{ "IN_CLUSTER": "Connect to the cluster using in-cluster environment variables", "CLEANUP_JOBS": "If true, the job and associated secrets will be destroyed when it finishes running. If false, it will not be destroyed. The supported values are true and false. Defaults to true.", + "JOB_VOLUME_PATH": "Path where the JOB_VOLUME_NAME is mounted locally", + "JOB_VOLUME_NAME": "Name of the PersistentVolumeClaim to mount with the invocation image to persist the bundle outputs.", "KUBE_NAMESPACE": "Kubernetes namespace in which to run the invocation image", "SERVICE_ACCOUNT": "Kubernetes service account to be mounted by the invocation image (if empty, no service account token will be mounted)", "KUBECONFIG": "Absolute path to the kubeconfig file", @@ -94,6 +101,8 @@ func (k *Driver) SetConfig(settings map[string]string) error { k.setDefaults() k.Namespace = settings["KUBE_NAMESPACE"] k.ServiceAccountName = settings["SERVICE_ACCOUNT"] + k.JobVolumePath = settings["JOB_VOLUME_PATH"] + k.JobVolumeName = settings["JOB_VOLUME_NAME"] cleanup, err := strconv.ParseBool(settings["CLEANUP_JOBS"]) if err != nil { @@ -246,10 +255,33 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { }, }, }) - container.VolumeMounts = mounts + container.VolumeMounts = append(container.VolumeMounts, mounts...) + } + + // Mount a volume to store the bundle outputs + if k.JobVolumeName != "" { + if k.JobVolumePath == "" { + return driver.OperationResult{}, errors.New("no JobVolumePath was specified but JobVolumeName was provided") + } + + job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, v1.Volume{ + Name: "outputs", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: k.JobVolumeName, + }, + }, + }) + container.VolumeMounts = append(container.VolumeMounts, v1.VolumeMount{ + Name: "outputs", + MountPath: "/cnab/app/outputs", + }) + } else if len(op.Bundle.Outputs) > 0 { + return driver.OperationResult{}, errors.New("no PersistentVolumeClaim was specified for JobVolumeName but the bundle defines outputs") } job.Spec.Template.Spec.Containers = []v1.Container{container} + job, err = k.jobs.Create(job) if err != nil { return driver.OperationResult{}, err @@ -258,24 +290,70 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { defer k.deleteJob(job.ObjectMeta.Name) } - // Return early for unit testing purposes (the fake k8s client implementation just + // Skip waiting for the job in unit tests (the fake k8s client implementation just // hangs during watch because no events are ever created on the Job) - if k.skipJobStatusCheck { - return driver.OperationResult{}, nil + var opErr *multierror.Error + if !k.skipJobStatusCheck { + // Create a selector to detect the job just created + jobSelector := metav1.ListOptions{ + LabelSelector: labels.Set(job.ObjectMeta.Labels).String(), + FieldSelector: newSingleFieldSelector("metadata.name", job.ObjectMeta.Name), + } + + // Prevent detecting pods from prior jobs by adding the job name to the labels + podSelector := metav1.ListOptions{ + LabelSelector: newSingleFieldSelector("job-name", job.ObjectMeta.Name), + } + + err = k.watchJobStatusAndLogs(podSelector, jobSelector, op.Out) + if err != nil { + opErr = multierror.Append(opErr, errors.Wrapf(err, "job %s failed", job.Name)) + } + } + + opResult, err := k.fetchOutputs(op) + if err != nil { + opErr = multierror.Append(opErr, err) } - // Create a selector to detect the job just created - jobSelector := metav1.ListOptions{ - LabelSelector: labels.Set(job.ObjectMeta.Labels).String(), - FieldSelector: newSingleFieldSelector("metadata.name", job.ObjectMeta.Name), + return opResult, opErr.ErrorOrNil() +} + +// fetchOutputs collects any outputs created by the job that were persisted to JobVolumeName (which is mounted locally +// at JobVolumePath). +// +// The goal is to collect all the files in the directory (recursively) and put them in a flat map of path to contents. +// This map will be inside the OperationResult. When fetchOutputs returns an error, it may also return partial results. +func (k *Driver) fetchOutputs(op *driver.Operation) (driver.OperationResult, error) { + opResult := driver.OperationResult{ + Outputs: map[string]string{}, } - // Prevent detecting pods from prior jobs by adding the job name to the labels - podSelector := metav1.ListOptions{ - LabelSelector: newSingleFieldSelector("job-name", job.ObjectMeta.Name), + if len(op.Bundle.Outputs) == 0 { + return opResult, nil } - return driver.OperationResult{}, k.watchJobStatusAndLogs(podSelector, jobSelector, op.Out) + err := filepath.Walk(k.JobVolumePath, func(path string, info os.FileInfo, err error) error { + // skip directories because we're gathering file contents + if info.IsDir() { + return nil + } + + var contents []byte + pathInContainer := path.Join("/cnab/app/outputs", info.Name()) + outputName, shouldCapture := op.Outputs[pathInContainer] + if shouldCapture { + contents, err = ioutil.ReadFile(path) + if err != nil { + return errors.Wrapf(err, "error while reading %q from outputs", pathInContainer) + } + opResult.Outputs[outputName] = string(contents) + } + + return nil + }) + + return opResult, err } func (k *Driver) watchJobStatusAndLogs(podSelector metav1.ListOptions, jobSelector metav1.ListOptions, out io.Writer) error { diff --git a/driver/kubernetes/kubernetes_test.go b/driver/kubernetes/kubernetes_test.go index 2ed6510a..ff69602a 100644 --- a/driver/kubernetes/kubernetes_test.go +++ b/driver/kubernetes/kubernetes_test.go @@ -1,7 +1,9 @@ package kubernetes import ( + "io/ioutil" "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" @@ -26,6 +28,7 @@ func TestDriver_Run(t *testing.T) { } op := driver.Operation{ Action: "install", + Bundle: &bundle.Bundle{}, Image: bundle.InvocationImage{BaseImage: bundle.BaseImage{Image: "foo/bar"}}, Out: os.Stdout, Environment: map[string]string{ @@ -43,6 +46,60 @@ func TestDriver_Run(t *testing.T) { assert.Equal(t, len(secretList.Items), 1, "expected one secret to be created") } +func TestDriver_RunWithOutputs(t *testing.T) { + // Simulate that the bundle generated output "foo" + outputs, err := ioutil.TempDir("", "cnab-go") + require.NoError(t, err, "could not create test directory") + defer os.RemoveAll(outputs) + + err = ioutil.WriteFile(filepath.Join(outputs, "foo"), []byte("foobar"), 0644) + require.NoError(t, err, "could not write output foo") + + client := fake.NewSimpleClientset() + namespace := "default" + k := Driver{ + Namespace: namespace, + jobs: client.BatchV1().Jobs(namespace), + secrets: client.CoreV1().Secrets(namespace), + pods: client.CoreV1().Pods(namespace), + JobVolumePath: outputs, + JobVolumeName: "outputs", + SkipCleanup: true, + skipJobStatusCheck: true, + } + op := driver.Operation{ + Action: "install", + Image: bundle.InvocationImage{BaseImage: bundle.BaseImage{Image: "foo/bar"}}, + Bundle: &bundle.Bundle{ + Outputs: map[string]bundle.Output{ + "foo": { + Definition: "foo", + Path: "/cnab/app/outputs/foo", + }, + }, + }, + Out: os.Stdout, + Outputs: map[string]string{ + "/cnab/app/outputs/foo": "foo", + }, + Environment: map[string]string{ + "foo": "bar", + }, + } + + opResult, err := k.Run(&op) + require.NoError(t, err) + + jobList, _ := k.jobs.List(metav1.ListOptions{}) + assert.Equal(t, len(jobList.Items), 1, "expected one job to be created") + + secretList, _ := k.secrets.List(metav1.ListOptions{}) + assert.Equal(t, len(secretList.Items), 1, "expected one secret to be created") + + require.Contains(t, opResult.Outputs, "foo", "expected the foo output to be collected") + assert.Equal(t, "foobar", opResult.Outputs["foo"], "invalid output value for foo ") +} + func TestImageWithDigest(t *testing.T) { testCases := map[string]bundle.InvocationImage{ "foo": { diff --git a/e2e-kind.sh b/e2e-kind.sh index 7e6e1d83..486ce495 100755 --- a/e2e-kind.sh +++ b/e2e-kind.sh @@ -9,7 +9,7 @@ set -o pipefail CLUSTER_NAME=cnab-go-testing readonly CLUSTER_NAME -K8S_VERSION=v1.15.3 +K8S_VERSION=v1.19.1 readonly K8S_VERSION KIND_KUBECONFIG="$PWD/kind-kubeconfig.yaml" From e49538d146d3fc4807772e64df6d1d2c25eb35c8 Mon Sep 17 00:00:00 2001 From: Carolyn Van Slyck Date: Wed, 27 Jan 2021 17:01:15 -0600 Subject: [PATCH 2/7] Apply custom labels in the kubernetes driver Allow the caller to specify additional labels to apply to resources created by the driver. Signed-off-by: Carolyn Van Slyck --- driver/kubernetes/kubernetes.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/driver/kubernetes/kubernetes.go b/driver/kubernetes/kubernetes.go index 023b3b5b..823b9961 100644 --- a/driver/kubernetes/kubernetes.go +++ b/driver/kubernetes/kubernetes.go @@ -50,6 +50,7 @@ type Driver struct { Namespace string ServiceAccountName string Annotations map[string]string + Labels []string LimitCPU resource.Quantity LimitMemory resource.Quantity JobVolumePath string @@ -87,6 +88,7 @@ func (k *Driver) Config() map[string]string { return map[string]string{ "IN_CLUSTER": "Connect to the cluster using in-cluster environment variables", "CLEANUP_JOBS": "If true, the job and associated secrets will be destroyed when it finishes running. If false, it will not be destroyed. The supported values are true and false. Defaults to true.", + "LABELS": "Labels to apply to cluster resources created by the driver, separated by whitespace.", "JOB_VOLUME_PATH": "Path where the JOB_VOLUME_NAME is mounted locally", "JOB_VOLUME_NAME": "Name of the PersistentVolumeClaim to mount with the invocation image to persist the bundle outputs.", "KUBE_NAMESPACE": "Kubernetes namespace in which to run the invocation image", @@ -101,6 +103,7 @@ func (k *Driver) SetConfig(settings map[string]string) error { k.setDefaults() k.Namespace = settings["KUBE_NAMESPACE"] k.ServiceAccountName = settings["SERVICE_ACCOUNT"] + k.Labels = strings.Split(settings["LABELS"], " ") k.JobVolumePath = settings["JOB_VOLUME_PATH"] k.JobVolumeName = settings["JOB_VOLUME_NAME"] @@ -170,6 +173,15 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { }, Annotations: generateMergedAnnotations(op, k.Annotations), } + + // Apply custom labels + for _, l := range k.Labels { + parts := strings.SplitN(l, "=", 2) + if len(parts) > 1 { + meta.Labels[parts[0]] = parts[1] + } + } + // Mount SA token if a non-zero value for ServiceAccountName has been specified mountServiceAccountToken := k.ServiceAccountName != "" job := &batchv1.Job{ From 547000e1389ee20516470e43183a85074a6a2885 Mon Sep 17 00:00:00 2001 From: Carolyn Van Slyck Date: Wed, 27 Jan 2021 17:05:23 -0600 Subject: [PATCH 3/7] Fix setting CLEANUP_JOBS Signed-off-by: Carolyn Van Slyck --- driver/kubernetes/kubernetes.go | 2 +- driver/kubernetes/kubernetes_integration_test.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/driver/kubernetes/kubernetes.go b/driver/kubernetes/kubernetes.go index 823b9961..5fa53d7f 100644 --- a/driver/kubernetes/kubernetes.go +++ b/driver/kubernetes/kubernetes.go @@ -108,7 +108,7 @@ func (k *Driver) SetConfig(settings map[string]string) error { k.JobVolumeName = settings["JOB_VOLUME_NAME"] cleanup, err := strconv.ParseBool(settings["CLEANUP_JOBS"]) - if err != nil { + if err == nil { k.SkipCleanup = !cleanup } diff --git a/driver/kubernetes/kubernetes_integration_test.go b/driver/kubernetes/kubernetes_integration_test.go index 80b4fd88..3ce8b7de 100644 --- a/driver/kubernetes/kubernetes_integration_test.go +++ b/driver/kubernetes/kubernetes_integration_test.go @@ -89,8 +89,14 @@ func TestDriver_Run_Integration(t *testing.T) { } func TestDriver_SetConfig(t *testing.T) { + t.Run("cleanup_jobs", func(t *testing.T) { + d := Driver{} + d.SetConfig(map[string]string{ + "CLEANUP_JOBS": "false", + }) + assert.True(t, d.SkipCleanup) + }) t.Run("kubeconfig", func(t *testing.T) { - d := Driver{} err := d.SetConfig(map[string]string{ "KUBECONFIG": os.Getenv("KUBECONFIG"), From 0e2fef9e1c19e90740ad0764550f23d89aad3c4f Mon Sep 17 00:00:00 2001 From: Carolyn Van Slyck Date: Thu, 28 Jan 2021 16:02:50 -0600 Subject: [PATCH 4/7] Use shared volume for input files too When you mount a configmap or secret value as a file into a pod, it is read only, which doesn't fit the CNAB spec Signed-off-by: Carolyn Van Slyck --- driver/kubernetes/kubernetes.go | 187 +++++++++--------- .../kubernetes/kubernetes_integration_test.go | 57 +++++- driver/kubernetes/kubernetes_test.go | 62 +++++- 3 files changed, 203 insertions(+), 103 deletions(-) diff --git a/driver/kubernetes/kubernetes.go b/driver/kubernetes/kubernetes.go index 5fa53d7f..99e98f48 100644 --- a/driver/kubernetes/kubernetes.go +++ b/driver/kubernetes/kubernetes.go @@ -35,10 +35,18 @@ import ( ) const ( - k8sContainerName = "invocation" - k8sFileSecretVolume = "files" - numBackoffLoops = 6 - cnabPrefix = "cnab.io/" + k8sContainerName = "invocation" + numBackoffLoops = 6 + cnabPrefix = "cnab.io/" + SettingInCluster = "IN_CLUSTER" + SettingCleanupJobs = "CLEANUP_JOBS" + SettingLabels = "LABELS" + SettingJobVolumePath = "JOB_VOLUME_PATH" + SettingJobVolumeName = "JOB_VOLUME_NAME" + SettingKubeNamespace = "KUBE_NAMESPACE" + SettingServiceAccount = "SERVICE_ACCOUNT" + SettingKubeconfig = "KUBECONFIG" + SettingMasterUrl = "MASTER_URL" ) var ( @@ -86,47 +94,54 @@ func (k *Driver) Handles(imagetype string) bool { // Config returns the Kubernetes driver configuration options. func (k *Driver) Config() map[string]string { return map[string]string{ - "IN_CLUSTER": "Connect to the cluster using in-cluster environment variables", - "CLEANUP_JOBS": "If true, the job and associated secrets will be destroyed when it finishes running. If false, it will not be destroyed. The supported values are true and false. Defaults to true.", - "LABELS": "Labels to apply to cluster resources created by the driver, separated by whitespace.", - "JOB_VOLUME_PATH": "Path where the JOB_VOLUME_NAME is mounted locally", - "JOB_VOLUME_NAME": "Name of the PersistentVolumeClaim to mount with the invocation image to persist the bundle outputs.", - "KUBE_NAMESPACE": "Kubernetes namespace in which to run the invocation image", - "SERVICE_ACCOUNT": "Kubernetes service account to be mounted by the invocation image (if empty, no service account token will be mounted)", - "KUBECONFIG": "Absolute path to the kubeconfig file", - "MASTER_URL": "Kubernetes master endpoint", + SettingInCluster: "Connect to the cluster using in-cluster environment variables", + SettingCleanupJobs: "If true, the job and associated secrets will be destroyed when it finishes running. If false, it will not be destroyed. The supported values are true and false. Defaults to true.", + SettingLabels: "Labels to apply to cluster resources created by the driver, separated by whitespace.", + SettingJobVolumePath: "Path where the persistent volume is mounted", + SettingJobVolumeName: "Name of the PersistentVolumeClaim to mount which enables the driver to share files with the invocation image", + SettingKubeNamespace: "Kubernetes namespace in which to run the invocation image", + SettingServiceAccount: "Kubernetes service account to be mounted by the invocation image (if empty, no service account token will be mounted)", + SettingKubeconfig: "Absolute path to the kubeconfig file", + SettingMasterUrl: "Kubernetes master endpoint", } } // SetConfig sets Kubernetes driver configuration. func (k *Driver) SetConfig(settings map[string]string) error { k.setDefaults() - k.Namespace = settings["KUBE_NAMESPACE"] - k.ServiceAccountName = settings["SERVICE_ACCOUNT"] - k.Labels = strings.Split(settings["LABELS"], " ") - k.JobVolumePath = settings["JOB_VOLUME_PATH"] - k.JobVolumeName = settings["JOB_VOLUME_NAME"] + k.Namespace = settings[SettingKubeNamespace] + k.ServiceAccountName = settings[SettingServiceAccount] + k.Labels = strings.Split(settings[SettingLabels], " ") - cleanup, err := strconv.ParseBool(settings["CLEANUP_JOBS"]) + k.JobVolumePath = settings[SettingJobVolumePath] + if k.JobVolumePath == "" { + return errors.Errorf("setting %s is required", SettingJobVolumePath) + } + k.JobVolumeName = settings[SettingJobVolumeName] + if k.JobVolumeName == "" { + return errors.Errorf("setting %s is required", SettingJobVolumeName) + } + + cleanup, err := strconv.ParseBool(settings[SettingCleanupJobs]) if err == nil { k.SkipCleanup = !cleanup } var conf *rest.Config - if incluster, _ := strconv.ParseBool(settings["IN_CLUSTER"]); incluster { + if incluster, _ := strconv.ParseBool(settings[SettingInCluster]); incluster { conf, err = rest.InClusterConfig() if err != nil { return errors.Wrap(err, "error retrieving in-cluster kubernetes configuration") } } else { var kubeconfig string - if kpath := settings["KUBECONFIG"]; kpath != "" { + if kpath := settings[SettingKubeconfig]; kpath != "" { kubeconfig = kpath } else if home := homeDir(); home != "" { kubeconfig = filepath.Join(home, ".kube", "config") } - conf, err = clientcmd.BuildConfigFromFlags(settings["MASTER_URL"], kubeconfig) + conf, err = clientcmd.BuildConfigFromFlags(settings[SettingMasterUrl], kubeconfig) if err != nil { return errors.Wrapf(err, "error retrieving external kubernetes configuration using configuration:\n%v", settings) } @@ -165,6 +180,12 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { return driver.OperationResult{}, fmt.Errorf("KUBE_NAMESPACE is required") } + const sharedVolumeName = "cnab-driver-share" + err = k.initJobVolumes(err) + if err != nil { + return driver.OperationResult{}, err + } + meta := metav1.ObjectMeta{ Namespace: k.Namespace, GenerateName: generateNameTemplate(op), @@ -184,6 +205,7 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { // Mount SA token if a non-zero value for ServiceAccountName has been specified mountServiceAccountToken := k.ServiceAccountName != "" + job := &batchv1.Job{ ObjectMeta: meta, Spec: batchv1.JobSpec{ @@ -200,6 +222,17 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { AutomountServiceAccountToken: &mountServiceAccountToken, RestartPolicy: v1.RestartPolicyNever, Tolerations: k.Tolerations, + Volumes: []v1.Volume{ + // This is a shared volume between the driver and the job so that files be shared + { + Name: sharedVolumeName, + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: k.JobVolumeName, + }, + }, + }, + }, }, }, }, @@ -220,6 +253,13 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { }, }, ImagePullPolicy: v1.PullIfNotPresent, + VolumeMounts: []v1.VolumeMount{ + { + Name: sharedVolumeName, + MountPath: "/cnab/app/outputs", + SubPath: "outputs", + }, + }, } if len(op.Environment) > 0 { @@ -248,48 +288,24 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { } if len(op.Files) > 0 { - secret, mounts := generateFileSecret(op.Files) - secret.ObjectMeta = meta - secret.ObjectMeta.GenerateName += "files-" - secret, err := k.secrets.Create(secret) - if err != nil { - return driver.OperationResult{}, err - } - if !k.SkipCleanup { - defer k.deleteSecret(secret.ObjectMeta.Name) - } - - job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, v1.Volume{ - Name: k8sFileSecretVolume, - VolumeSource: v1.VolumeSource{ - Secret: &v1.SecretVolumeSource{ - SecretName: secret.ObjectMeta.Name, - }, - }, - }) - container.VolumeMounts = append(container.VolumeMounts, mounts...) - } + // Write the files to the inputs directory on the shared volume and mount them individually to the desired location in the invocation image + for inputRelPath, contents := range op.Files { + inputPath := filepath.Join(k.JobVolumePath, "inputs", inputRelPath) + err = os.MkdirAll(filepath.Dir(inputPath), 0700) + if err != nil { + return driver.OperationResult{}, errors.Wrapf(err, "error creating directory for file %s on the shared job volume %s", inputPath, k.JobVolumeName) + } + err = ioutil.WriteFile(inputPath, []byte(contents), 0600) + if err != nil { + return driver.OperationResult{}, errors.Wrapf(err, "error writing file %s to the shared job volume %s", inputPath, k.JobVolumeName) + } - // Mount a volume to store the bundle outputs - if k.JobVolumeName != "" { - if k.JobVolumePath == "" { - return driver.OperationResult{}, errors.New("no JobVolumePath was specified but JobVolumeName was provided") + container.VolumeMounts = append(container.VolumeMounts, v1.VolumeMount{ + Name: sharedVolumeName, + MountPath: inputRelPath, + SubPath: path.Join("inputs", inputRelPath), + }) } - - job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, v1.Volume{ - Name: "outputs", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: k.JobVolumeName, - }, - }, - }) - container.VolumeMounts = append(container.VolumeMounts, v1.VolumeMount{ - Name: "outputs", - MountPath: "/cnab/app/outputs", - }) - } else if len(op.Bundle.Outputs) > 0 { - return driver.OperationResult{}, errors.New("no PersistentVolumeClaim was specified for JobVolumeName but the bundle defines outputs") } job.Spec.Template.Spec.Containers = []v1.Container{container} @@ -331,6 +347,24 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { return opResult, opErr.ErrorOrNil() } +func (k *Driver) initJobVolumes(err error) error { + // Store all job input files in ./inputs and outputs in ./outputs on the shared volume + + inputsDir := filepath.Join(k.JobVolumePath, "inputs") + err = os.Mkdir(inputsDir, 0700) + if err != nil && !os.IsExist(err) { + return errors.Wrapf(err, "error creating inputs directory %s on shared job volume %s", inputsDir, k.JobVolumeName) + } + + outputsDir := filepath.Join(k.JobVolumePath, "outputs") + err = os.Mkdir(outputsDir, 0700) + if err != nil && !os.IsExist(err) { + return errors.Wrapf(err, "error creating outputs directory %s on shared job volume %s", outputsDir, k.JobVolumeName) + } + + return nil +} + // fetchOutputs collects any outputs created by the job that were persisted to JobVolumeName (which is mounted locally // at JobVolumePath). // @@ -345,7 +379,8 @@ func (k *Driver) fetchOutputs(op *driver.Operation) (driver.OperationResult, err return opResult, nil } - err := filepath.Walk(k.JobVolumePath, func(path string, info os.FileInfo, err error) error { + outputsDir := filepath.Join(k.JobVolumePath, "outputs") + err := filepath.Walk(outputsDir, func(currentPath string, info os.FileInfo, err error) error { // skip directories because we're gathering file contents if info.IsDir() { return nil @@ -355,7 +390,7 @@ func (k *Driver) fetchOutputs(op *driver.Operation) (driver.OperationResult, err pathInContainer := path.Join("/cnab/app/outputs", info.Name()) outputName, shouldCapture := op.Outputs[pathInContainer] if shouldCapture { - contents, err = ioutil.ReadFile(path) + contents, err = ioutil.ReadFile(currentPath) if err != nil { return errors.Wrapf(err, "error while reading %q from outputs", pathInContainer) } @@ -520,30 +555,6 @@ func generateMergedAnnotations(op *driver.Operation, mergeWith map[string]string return anno } -func generateFileSecret(files map[string]string) (*v1.Secret, []v1.VolumeMount) { - size := len(files) - data := make(map[string]string, size) - mounts := make([]v1.VolumeMount, size) - - i := 0 - for path, contents := range files { - key := strings.Replace(filepath.ToSlash(path), "/", "_", -1) - data[key] = contents - mounts[i] = v1.VolumeMount{ - Name: k8sFileSecretVolume, - MountPath: path, - SubPath: key, - } - i++ - } - - secret := &v1.Secret{ - StringData: data, - } - - return secret, mounts -} - func newSingleFieldSelector(k, v string) string { return labels.Set(map[string]string{ k: v, diff --git a/driver/kubernetes/kubernetes_integration_test.go b/driver/kubernetes/kubernetes_integration_test.go index 3ce8b7de..403804fd 100644 --- a/driver/kubernetes/kubernetes_integration_test.go +++ b/driver/kubernetes/kubernetes_integration_test.go @@ -4,11 +4,17 @@ package kubernetes import ( "bytes" + "io/ioutil" "os" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/clientcmd" "github.com/cnabio/cnab-go/bundle" "github.com/cnabio/cnab-go/driver" @@ -16,10 +22,6 @@ import ( func TestDriver_Run_Integration(t *testing.T) { k := &Driver{} - k.SetConfig(map[string]string{ - "KUBE_NAMESPACE": "default", - "KUBECONFIG": os.Getenv("KUBECONFIG"), - }) k.ActiveDeadlineSeconds = 60 cases := []struct { @@ -33,6 +35,7 @@ func TestDriver_Run_Integration(t *testing.T) { op: &driver.Operation{ Installation: "example", Action: "install", + Bundle: &bundle.Bundle{}, Image: bundle.InvocationImage{ BaseImage: bundle.BaseImage{ Image: "cnab/helloworld", @@ -51,6 +54,7 @@ func TestDriver_Run_Integration(t *testing.T) { op: &driver.Operation{ Installation: "greater-than-300-length-and-special-chars/-*()+%@qcUYSfR9MS3BqR0kRDHe2K5EHJa8BJGrcoiDVvsDpATjIkrk4PWrdysIqFpJzrKHauRWfBjjF889Qdc5DUBQ6gKy8Qezkl9HyCmo88hMrkaeVPxknFt0nWRm0xqYhoaY0Db7ZcljchbBAufVvH5l0T7iBdg1E0iSCTZw0v5rCAEclNwzjpg7DfLq2SBdJ0W8XdyQSWVMpakjraXP9droq8ol70gX0QuqAZDkGtHyxet8Akv9lGCCVVFuY4kBdkW3LDHoxl0xz2EZzXja1GTlYui0Bpx0TGqMLish9tBOhuC7", Action: "install", + Bundle: &bundle.Bundle{}, Image: bundle.InvocationImage{ BaseImage: bundle.BaseImage{ Image: "cnab/helloworld", @@ -76,7 +80,24 @@ func TestDriver_Run_Integration(t *testing.T) { tc.op.Environment["CNAB_ACTION"] = tc.op.Action tc.op.Environment["CNAB_INSTALLATION_NAME"] = tc.op.Installation - _, err := k.Run(tc.op) + // Create a volume to share data with the invocation image + pvc, cleanup := createTestPVC(t) + defer cleanup() + + // Simulate mounting the shared volume + sharedDir, err := ioutil.TempDir("", "cnab-go") + require.NoError(t, err, "could not create test directory") + defer os.RemoveAll(sharedDir) + + err = k.SetConfig(map[string]string{ + SettingJobVolumePath: sharedDir, + SettingJobVolumeName: pvc, + SettingKubeNamespace: "default", + SettingKubeconfig: os.Getenv("KUBECONFIG"), + }) + require.NoError(t, err, "SetConfig failed") + + _, err = k.Run(tc.op) if tc.err != nil { assert.EqualError(t, err, tc.err.Error()) @@ -88,6 +109,32 @@ func TestDriver_Run_Integration(t *testing.T) { } } +func createTestPVC(t *testing.T) (string, func()) { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "cnab-driver-shared", + Namespace: "default", + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + Resources: v1.ResourceRequirements{Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceStorage: resource.MustParse("64Mi"), + }}, + }, + } + kubeconfig := os.Getenv("KUBECONFIG") + conf, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + require.NoError(t, err, "BuildConfigFromFlags failed") + coreClient, err := coreclientv1.NewForConfig(conf) + pvcClient := coreClient.PersistentVolumeClaims("default") + pvc, err = pvcClient.Create(pvc) + require.NoError(t, err, "create pvc failed") + + return pvc.Name, func() { + pvcClient.Delete(pvc.Name, &metav1.DeleteOptions{}) + } +} + func TestDriver_SetConfig(t *testing.T) { t.Run("cleanup_jobs", func(t *testing.T) { d := Driver{} diff --git a/driver/kubernetes/kubernetes_test.go b/driver/kubernetes/kubernetes_test.go index ff69602a..676c62c6 100644 --- a/driver/kubernetes/kubernetes_test.go +++ b/driver/kubernetes/kubernetes_test.go @@ -16,6 +16,11 @@ import ( ) func TestDriver_Run(t *testing.T) { + // Simulate the shared volume + sharedDir, err := ioutil.TempDir("", "cnab-go") + require.NoError(t, err, "could not create test directory") + defer os.RemoveAll(sharedDir) + client := fake.NewSimpleClientset() namespace := "default" k := Driver{ @@ -23,6 +28,8 @@ func TestDriver_Run(t *testing.T) { jobs: client.BatchV1().Jobs(namespace), secrets: client.CoreV1().Secrets(namespace), pods: client.CoreV1().Pods(namespace), + JobVolumePath: sharedDir, + JobVolumeName: "cnab-driver-shared", SkipCleanup: true, skipJobStatusCheck: true, } @@ -36,7 +43,7 @@ func TestDriver_Run(t *testing.T) { }, } - _, err := k.Run(&op) + _, err = k.Run(&op) assert.NoError(t, err) jobList, _ := k.jobs.List(metav1.ListOptions{}) @@ -46,13 +53,16 @@ func TestDriver_Run(t *testing.T) { assert.Equal(t, len(secretList.Items), 1, "expected one secret to be created") } -func TestDriver_RunWithOutputs(t *testing.T) { - // Simulate that the bundle generated output "foo" - outputs, err := ioutil.TempDir("", "cnab-go") +func TestDriver_RunWithSharedFiles(t *testing.T) { + // Simulate the shared volume + sharedDir, err := ioutil.TempDir("", "cnab-go") require.NoError(t, err, "could not create test directory") - defer os.RemoveAll(outputs) + defer os.RemoveAll(sharedDir) - err = ioutil.WriteFile(filepath.Join(outputs, "foo"), []byte("foobar"), 0644) + // Simulate that the bundle generated output "foo" + err = os.Mkdir(filepath.Join(sharedDir, "outputs"), 0755) + require.NoError(t, err, "could not create outputs directory") + err = ioutil.WriteFile(filepath.Join(sharedDir, "outputs/foo"), []byte("foobar"), 0644) require.NoError(t, err, "could not write output foo") client := fake.NewSimpleClientset() @@ -62,8 +72,8 @@ func TestDriver_RunWithOutputs(t *testing.T) { jobs: client.BatchV1().Jobs(namespace), secrets: client.CoreV1().Secrets(namespace), pods: client.CoreV1().Pods(namespace), - JobVolumePath: outputs, - JobVolumeName: "outputs", + JobVolumePath: sharedDir, + JobVolumeName: "cnab-driver-shared", SkipCleanup: true, skipJobStatusCheck: true, } @@ -85,6 +95,9 @@ func TestDriver_RunWithOutputs(t *testing.T) { Environment: map[string]string{ "foo": "bar", }, + Files: map[string]string{ + "/cnab/app/someinput": "input value", + }, } opResult, err := k.Run(&op) @@ -98,6 +111,11 @@ func TestDriver_RunWithOutputs(t *testing.T) { require.Contains(t, opResult.Outputs, "foo", "expected the foo output to be collected") assert.Equal(t, "foobar", opResult.Outputs["foo"], "invalid output value for foo ") + + wantInputFile := filepath.Join(sharedDir, "inputs/cnab/app/someinput") + inputContents, err := ioutil.ReadFile(wantInputFile) + require.NoErrorf(t, err, "could not read generated input file %s on shared volume", wantInputFile) + assert.Equal(t, "input value", string(inputContents), "invalid input file contents") } func TestImageWithDigest(t *testing.T) { @@ -221,11 +239,33 @@ func TestGenerateNameTemplate(t *testing.T) { } func TestDriver_SetConfig_Fails(t *testing.T) { + t.Run("job volume name missing", func(t *testing.T) { + + d := Driver{} + err := d.SetConfig(map[string]string{ + "JOB_VOLUME_PATH": "/tmp", + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "setting JOB_VOLUME_NAME is required") + }) + + t.Run("job volume path missing", func(t *testing.T) { + + d := Driver{} + err := d.SetConfig(map[string]string{ + "JOB_VOLUME_Name": "cnab-driver-shared", + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "setting JOB_VOLUME_PATH is required") + }) + t.Run("kubeconfig invalid", func(t *testing.T) { d := Driver{} err := d.SetConfig(map[string]string{ - "KUBECONFIG": "invalid", + "KUBECONFIG": "invalid", + "JOB_VOLUME_NAME": "cnab-driver-shared", + "JOB_VOLUME_PATH": "/tmp", }) require.Error(t, err) assert.Contains(t, err.Error(), "error retrieving external kubernetes configuration using configuration") @@ -239,7 +279,9 @@ func TestDriver_SetConfig_Fails(t *testing.T) { d := Driver{} err := d.SetConfig(map[string]string{ - "IN_CLUSTER": "true", + "IN_CLUSTER": "true", + "JOB_VOLUME_NAME": "cnab-driver-shared", + "JOB_VOLUME_PATH": "/tmp", }) require.Error(t, err) assert.Contains(t, err.Error(), "error retrieving in-cluster kubernetes configuration") From c55ff3ee5b48a8aef8c790ad8f43e77a978dc7ed Mon Sep 17 00:00:00 2001 From: Carolyn Van Slyck Date: Mon, 1 Feb 2021 09:07:37 -0600 Subject: [PATCH 5/7] Remove requiredCompletions from k8s driver The only value that ever makes sense for requiredCompletions is 1 because we aren't running parallel pods for the bundle job. Signed-off-by: Carolyn Van Slyck --- driver/kubernetes/kubernetes.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/driver/kubernetes/kubernetes.go b/driver/kubernetes/kubernetes.go index 99e98f48..8690eae7 100644 --- a/driver/kubernetes/kubernetes.go +++ b/driver/kubernetes/kubernetes.go @@ -72,7 +72,6 @@ type Driver struct { secrets coreclientv1.SecretInterface pods coreclientv1.PodInterface deletionPolicy metav1.DeletionPropagation - requiredCompletions int32 } // New initializes a Kubernetes driver. @@ -154,7 +153,6 @@ func (k *Driver) setDefaults() { k.SkipCleanup = false k.BackoffLimit = 0 k.ActiveDeadlineSeconds = 300 - k.requiredCompletions = 1 k.deletionPolicy = metav1.DeletePropagationBackground } @@ -210,7 +208,7 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { ObjectMeta: meta, Spec: batchv1.JobSpec{ ActiveDeadlineSeconds: &k.ActiveDeadlineSeconds, - Completions: &k.requiredCompletions, + Completions: defaultInt32Ptr(1), BackoffLimit: &k.BackoffLimit, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -438,9 +436,7 @@ func (k *Driver) watchJobStatusAndLogs(podSelector metav1.ListOptions, jobSelect } // Wait for pod logs to finish printing - for i := 0; i < int(k.requiredCompletions); i++ { - <-logsStreamingComplete - } + <-logsStreamingComplete return err } From 3ba15dcfdafcfc956fa7323bf424576ddeed941c Mon Sep 17 00:00:00 2001 From: Carolyn Van Slyck Date: Mon, 1 Feb 2021 09:13:43 -0600 Subject: [PATCH 6/7] Improve defaulting for kubernetes driver * When LimitCPU or LimitMemory is 0, do not set a limit. We don't set a request size, and requesting 0 is not the same as not having a requested resource quantity. This can lead to increased pod evictions. * Clarify that LimitCPU and LimitMemory are also used for the requested resources. * When ActiveDeadlineSeconds is 0, do not set a deadline. * Stop defaulting ActiveDeadlineSeconds to 5 minutes. It will cut off a bundle mid-execution, which isn't a great default. * Split reading the configuration in SetConfig and attempting to connect to the cluster, so that we can unit test the settings logic. * Move validation for KUBE_NAMESPACE into SetConfig so that any errors can be reported immediately. * Add comments for all of the driver settings Signed-off-by: Carolyn Van Slyck --- driver/kubernetes/kubernetes.go | 186 ++++++++++++++---- .../kubernetes/kubernetes_integration_test.go | 17 +- driver/kubernetes/kubernetes_test.go | 160 ++++++++++++--- 3 files changed, 278 insertions(+), 85 deletions(-) diff --git a/driver/kubernetes/kubernetes.go b/driver/kubernetes/kubernetes.go index 8690eae7..103413b7 100644 --- a/driver/kubernetes/kubernetes.go +++ b/driver/kubernetes/kubernetes.go @@ -46,7 +46,7 @@ const ( SettingKubeNamespace = "KUBE_NAMESPACE" SettingServiceAccount = "SERVICE_ACCOUNT" SettingKubeconfig = "KUBECONFIG" - SettingMasterUrl = "MASTER_URL" + SettingMasterURL = "MASTER_URL" ) var ( @@ -55,23 +55,77 @@ var ( // Driver runs an invocation image in a Kubernetes cluster. type Driver struct { - Namespace string - ServiceAccountName string - Annotations map[string]string - Labels []string - LimitCPU resource.Quantity - LimitMemory resource.Quantity - JobVolumePath string - JobVolumeName string - Tolerations []v1.Toleration + // Namespace where the bundle's job should be executed. Required. + Namespace string + + // ServiceAccountName is the name of the ServiceAccount under which the + // bundle's job should be executed. Leave blank to execute as the default + // ServiceAccount of the namespace. + ServiceAccountName string + + // Annotations that should be applied to any Kubernetes resources created + // by the driver. + Annotations map[string]string + + // Labels that should be applied to any Kubernetes resources created + // by the driver. + Labels []string + + // LimitCPU is the amount of CPU to request and the limit for the bundle's job. + // Set to zero to not use a limit. Defaults to zero. + LimitCPU resource.Quantity + + // LimitMemory is amount of memory to request and the limit for the bundle's job. + // Set to zero to not use a limit. Defaults to zero. + LimitMemory resource.Quantity + + // JobVolumePath is the local path where the a persistent volume is mounted to share + // data between the driver and the bundle. + JobVolumePath string + + // JobVolumeName is the name of the persistent volume claim that should be mounted + // to the bundle's pod to share data between the driver and the bundle. + // + // Files that should be injected into the bundle are stored in ./inputs and the + // directory ./outputs is mounted to /cnab/app/outputs to collect any bundle + // outputs generated. + JobVolumeName string + + // Tolerations is an optional list of tolerations to apply to the bundle's job. + Tolerations []v1.Toleration + + // ActiveDeadlineSeconds is the time limit for running the driver's + // execution, including retries. Set to 0 to not use a deadline. Default is + // 5 minutes. + // + // Setting this value to a non-zero value can cause bundles that would have + // been successful, or that have even completed successfully, to halt abruptly + // before the bundle's execution run can be recorded in claim storage. ActiveDeadlineSeconds int64 - BackoffLimit int32 - SkipCleanup bool - skipJobStatusCheck bool - jobs batchclientv1.JobInterface - secrets coreclientv1.SecretInterface - pods coreclientv1.PodInterface - deletionPolicy metav1.DeletionPropagation + + // BackoffLimit is the number of times to retry the driver's + // execution. Defaults to 0, so failed executions will not be retried. + BackoffLimit int32 + + // SkipCleanup specifies if the driver should remove any Kubernetes + // resources that it created when the driver execution completes. + SkipCleanup bool + + // InCluster indicates if the driver should connect to the cluster using + // in-cluster environment variables. + InCluster bool + + // Kubeconfig is the absolute path to the kubeconfig file. + Kubeconfig string + + // MasterURL is the Kubernetes API endpoint. + MasterURL string + + skipJobStatusCheck bool + jobs batchclientv1.JobInterface + secrets coreclientv1.SecretInterface + pods coreclientv1.PodInterface + deletionPolicy metav1.DeletionPropagation } // New initializes a Kubernetes driver. @@ -101,7 +155,7 @@ func (k *Driver) Config() map[string]string { SettingKubeNamespace: "Kubernetes namespace in which to run the invocation image", SettingServiceAccount: "Kubernetes service account to be mounted by the invocation image (if empty, no service account token will be mounted)", SettingKubeconfig: "Absolute path to the kubeconfig file", - SettingMasterUrl: "Kubernetes master endpoint", + SettingMasterURL: "Kubernetes master endpoint", } } @@ -109,6 +163,10 @@ func (k *Driver) Config() map[string]string { func (k *Driver) SetConfig(settings map[string]string) error { k.setDefaults() k.Namespace = settings[SettingKubeNamespace] + if k.Namespace == "" { + return errors.Errorf("setting %s is required", SettingKubeNamespace) + } + k.ServiceAccountName = settings[SettingServiceAccount] k.Labels = strings.Split(settings[SettingLabels], " ") @@ -126,13 +184,15 @@ func (k *Driver) SetConfig(settings map[string]string) error { k.SkipCleanup = !cleanup } - var conf *rest.Config - if incluster, _ := strconv.ParseBool(settings[SettingInCluster]); incluster { - conf, err = rest.InClusterConfig() + if inClusterVal, ok := settings[SettingInCluster]; ok { + inCluster, err := strconv.ParseBool(inClusterVal) if err != nil { - return errors.Wrap(err, "error retrieving in-cluster kubernetes configuration") + return errors.Wrapf(err, "invalid value %q for %s", inClusterVal, SettingInCluster) } - } else { + k.InCluster = inCluster + } + + if !k.InCluster { var kubeconfig string if kpath := settings[SettingKubeconfig]; kpath != "" { kubeconfig = kpath @@ -140,22 +200,43 @@ func (k *Driver) SetConfig(settings map[string]string) error { kubeconfig = filepath.Join(home, ".kube", "config") } - conf, err = clientcmd.BuildConfigFromFlags(settings[SettingMasterUrl], kubeconfig) - if err != nil { - return errors.Wrapf(err, "error retrieving external kubernetes configuration using configuration:\n%v", settings) - } + k.Kubeconfig = kubeconfig + k.MasterURL = settings[SettingMasterURL] } - return k.setClient(conf) + return nil } func (k *Driver) setDefaults() { k.SkipCleanup = false k.BackoffLimit = 0 - k.ActiveDeadlineSeconds = 300 + k.ActiveDeadlineSeconds = 0 // Default to not cutting off a bundle mid-run k.deletionPolicy = metav1.DeletePropagationBackground } +func (k *Driver) initClient() error { + // Check if a test has already configured a client + if k.jobs != nil { + return nil + } + + var conf *rest.Config + var err error + if k.InCluster { + conf, err = rest.InClusterConfig() + if err != nil { + return errors.Wrap(err, "error retrieving in-cluster kubernetes configuration") + } + } else { + conf, err = clientcmd.BuildConfigFromFlags(k.MasterURL, k.Kubeconfig) + if err != nil { + return errors.Wrapf(err, "error retrieving external kubernetes configuration for %s with kubeconfig %s", k.MasterURL, k.Kubeconfig) + } + } + + return k.setClient(conf) +} + func (k *Driver) setClient(conf *rest.Config) error { coreClient, err := coreclientv1.NewForConfig(conf) if err != nil { @@ -174,8 +255,9 @@ func (k *Driver) setClient(conf *rest.Config) error { // Run executes the operation inside of the invocation image. func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { - if k.Namespace == "" { - return driver.OperationResult{}, fmt.Errorf("KUBE_NAMESPACE is required") + err := k.initClient() + if err != nil { + return driver.OperationResult{}, err } const sharedVolumeName = "cnab-driver-share" @@ -207,7 +289,7 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { job := &batchv1.Job{ ObjectMeta: meta, Spec: batchv1.JobSpec{ - ActiveDeadlineSeconds: &k.ActiveDeadlineSeconds, + ActiveDeadlineSeconds: defaultInt64Ptr(k.ActiveDeadlineSeconds), Completions: defaultInt32Ptr(1), BackoffLimit: &k.BackoffLimit, Template: v1.PodTemplateSpec{ @@ -241,15 +323,9 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { } container := v1.Container{ - Name: k8sContainerName, - Image: img, - Command: []string{"/cnab/app/run"}, - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - v1.ResourceCPU: k.LimitCPU, - v1.ResourceMemory: k.LimitMemory, - }, - }, + Name: k8sContainerName, + Image: img, + Command: []string{"/cnab/app/run"}, ImagePullPolicy: v1.PullIfNotPresent, VolumeMounts: []v1.VolumeMount{ { @@ -260,6 +336,14 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { }, } + if !k.LimitCPU.IsZero() { + container.Resources.Limits[v1.ResourceCPU] = k.LimitCPU + } + + if !k.LimitMemory.IsZero() { + container.Resources.Limits[v1.ResourceMemory] = k.LimitMemory + } + if len(op.Environment) > 0 { secret := &v1.Secret{ ObjectMeta: meta, @@ -363,6 +447,26 @@ func (k *Driver) initJobVolumes(err error) error { return nil } +// defaultInt64Ptr converts an integer value to a pointer, treating values less +// than or equal to zero as nil. +func defaultInt64Ptr(value int64) *int64 { + var ptr *int64 + if value > 0 { + ptr = &value + } + return ptr +} + +// defaultInt32Ptr converts an integer value to a pointer, treating values less +// than or equal to zero as nil. +func defaultInt32Ptr(value int32) *int32 { + var ptr *int32 + if value > 0 { + ptr = &value + } + return ptr +} + // fetchOutputs collects any outputs created by the job that were persisted to JobVolumeName (which is mounted locally // at JobVolumePath). // diff --git a/driver/kubernetes/kubernetes_integration_test.go b/driver/kubernetes/kubernetes_integration_test.go index 403804fd..57056350 100644 --- a/driver/kubernetes/kubernetes_integration_test.go +++ b/driver/kubernetes/kubernetes_integration_test.go @@ -135,19 +135,12 @@ func createTestPVC(t *testing.T) (string, func()) { } } -func TestDriver_SetConfig(t *testing.T) { - t.Run("cleanup_jobs", func(t *testing.T) { - d := Driver{} - d.SetConfig(map[string]string{ - "CLEANUP_JOBS": "false", - }) - assert.True(t, d.SkipCleanup) - }) +func TestDriver_InitClient(t *testing.T) { t.Run("kubeconfig", func(t *testing.T) { - d := Driver{} - err := d.SetConfig(map[string]string{ - "KUBECONFIG": os.Getenv("KUBECONFIG"), - }) + d := Driver{ + Kubeconfig: os.Getenv("KUBECONFIG"), + } + err := d.initClient() require.NoError(t, err) }) } diff --git a/driver/kubernetes/kubernetes_test.go b/driver/kubernetes/kubernetes_test.go index 676c62c6..36fc8def 100644 --- a/driver/kubernetes/kubernetes_test.go +++ b/driver/kubernetes/kubernetes_test.go @@ -238,52 +238,148 @@ func TestGenerateNameTemplate(t *testing.T) { } } -func TestDriver_SetConfig_Fails(t *testing.T) { - t.Run("job volume name missing", func(t *testing.T) { +func TestDriver_ConfigureJob(t *testing.T) { + // Simulate the shared volume + sharedDir, err := ioutil.TempDir("", "cnab-go") + require.NoError(t, err, "could not create test directory") + defer os.RemoveAll(sharedDir) + + client := fake.NewSimpleClientset() + namespace := "myns" + k := Driver{ + Namespace: namespace, + ActiveDeadlineSeconds: 0, + Annotations: map[string]string{"b": "2"}, + Labels: []string{"a=1"}, + jobs: client.BatchV1().Jobs(namespace), + secrets: client.CoreV1().Secrets(namespace), + pods: client.CoreV1().Pods(namespace), + JobVolumePath: sharedDir, + JobVolumeName: "cnab-driver-shared", + SkipCleanup: true, + skipJobStatusCheck: true, + } + op := driver.Operation{ + Action: "install", + Installation: "mybundle", + Revision: "abc123", + Bundle: &bundle.Bundle{}, + Image: bundle.InvocationImage{BaseImage: bundle.BaseImage{Image: "foo/bar"}}, + } + + _, err = k.Run(&op) + assert.NoError(t, err) + + jobList, _ := k.jobs.List(metav1.ListOptions{}) + assert.Len(t, jobList.Items, 1, "expected one job to be created") + + job := jobList.Items[0] + assert.Nil(t, job.Spec.ActiveDeadlineSeconds, "incorrect Job ActiveDeadlineSeconds") + assert.Equal(t, int32(1), *job.Spec.Completions, "incorrect Job Completions") + assert.Equal(t, int32(0), *job.Spec.BackoffLimit, "incorrect Job BackoffLimit") + + wantLabels := map[string]string{ + "a": "1", + "cnab.io/driver": "kubernetes"} + assert.Equal(t, wantLabels, job.Labels, "Incorrect Job Labels") + + wantAnnotations := map[string]string{ + "b": "2", + "cnab.io/action": "install", + "cnab.io/installation": "mybundle", + "cnab.io/revision": "abc123"} + assert.Equal(t, wantAnnotations, job.Annotations, "Incorrect Job Annotations") + + pod := job.Spec.Template + assert.Equal(t, wantLabels, pod.Labels, "incorrect Pod Labels") + assert.Equal(t, wantAnnotations, pod.Annotations, "incorrect Pod Annotations") + assert.Len(t, pod.Spec.Containers, 1, "expected one container in the pod") + + container := pod.Spec.Containers[0] + assert.Empty(t, container.Resources.Limits, "incorrect Limits") +} + +func TestDriver_SetConfig(t *testing.T) { + validSettings := func() map[string]string { + return map[string]string{ + SettingInCluster: "true", + SettingKubeconfig: "/tmp/kube.config", + SettingMasterURL: "http://example.com", + SettingKubeNamespace: "default", + SettingJobVolumeName: "cnab-driver-shared", + SettingJobVolumePath: "/tmp", + SettingCleanupJobs: "false", + SettingLabels: "a=1 b=2", + SettingServiceAccount: "myacct", + } + } + t.Run("valid config", func(t *testing.T) { d := Driver{} - err := d.SetConfig(map[string]string{ - "JOB_VOLUME_PATH": "/tmp", - }) - require.Error(t, err) - assert.Contains(t, err.Error(), "setting JOB_VOLUME_NAME is required") + err := d.SetConfig(validSettings()) + require.NoError(t, err) + + assert.Equal(t, d.Namespace, "default", "incorrect Namespace value") + assert.Equal(t, d.JobVolumeName, "cnab-driver-shared", "incorrect JobVolumeName value") + assert.Equal(t, d.JobVolumePath, "/tmp", "incorrect JobVolumePath value") + assert.True(t, d.SkipCleanup, "incorrect SkipCleanup value") + assert.Equal(t, []string{"a=1", "b=2"}, d.Labels, "incorrect Labels value") + assert.Equal(t, "myacct", d.ServiceAccountName, "incorrect ServiceAccountName") + assert.Equal(t, int64(0), d.ActiveDeadlineSeconds, "ActiveDeadlineSeconds should be defaulted to 0 so bundle runs are not cut off") }) - t.Run("job volume path missing", func(t *testing.T) { + t.Run("incluster config", func(t *testing.T) { + d := Driver{} + settings := validSettings() + settings[SettingInCluster] = "true" + err := d.SetConfig(settings) + require.NoError(t, err) + + assert.True(t, d.InCluster, "incorrect InCluster value") + assert.Empty(t, d.Kubeconfig, "incorrect Kubeconfig value") + assert.Empty(t, d.MasterURL, "incorrect MasterUrl value") + }) + t.Run("kubeconfig", func(t *testing.T) { d := Driver{} - err := d.SetConfig(map[string]string{ - "JOB_VOLUME_Name": "cnab-driver-shared", - }) - require.Error(t, err) - assert.Contains(t, err.Error(), "setting JOB_VOLUME_PATH is required") + settings := validSettings() + settings[SettingInCluster] = "false" + err := d.SetConfig(settings) + require.NoError(t, err) + + assert.False(t, d.InCluster, "incorrect InCluster value") + assert.Equal(t, "/tmp/kube.config", d.Kubeconfig, "incorrect Kubeconfig value") + assert.Equal(t, "http://example.com", d.MasterURL, "incorrect MasterUrl value") }) - t.Run("kubeconfig invalid", func(t *testing.T) { + t.Run("master url optional", func(t *testing.T) { + d := Driver{} + settings := validSettings() + settings[SettingInCluster] = "false" + settings[SettingMasterURL] = "" + err := d.SetConfig(settings) + require.NoError(t, err) + + assert.False(t, d.InCluster, "incorrect InCluster value") + assert.Equal(t, "/tmp/kube.config", d.Kubeconfig, "incorrect Kubeconfig value") + assert.Empty(t, d.MasterURL, "incorrect MasterUrl value") + }) + t.Run("job volume name missing", func(t *testing.T) { d := Driver{} - err := d.SetConfig(map[string]string{ - "KUBECONFIG": "invalid", - "JOB_VOLUME_NAME": "cnab-driver-shared", - "JOB_VOLUME_PATH": "/tmp", - }) + settings := validSettings() + settings[SettingJobVolumeName] = "" + err := d.SetConfig(settings) require.Error(t, err) - assert.Contains(t, err.Error(), "error retrieving external kubernetes configuration using configuration") + assert.Contains(t, err.Error(), "setting JOB_VOLUME_NAME is required") }) - t.Run("use in-cluster outside cluster", func(t *testing.T) { - // Force this to fail even when the tests are run inside brigade - orig := os.Getenv("KUBERNETES_SERVICE_HOST") - os.Unsetenv("KUBERNETES_SERVICE_HOST") - defer os.Setenv("KUBERNETES_SERVICE_HOST", orig) - + t.Run("job volume path missing", func(t *testing.T) { d := Driver{} - err := d.SetConfig(map[string]string{ - "IN_CLUSTER": "true", - "JOB_VOLUME_NAME": "cnab-driver-shared", - "JOB_VOLUME_PATH": "/tmp", - }) + settings := validSettings() + settings[SettingJobVolumePath] = "" + err := d.SetConfig(settings) require.Error(t, err) - assert.Contains(t, err.Error(), "error retrieving in-cluster kubernetes configuration") + assert.Contains(t, err.Error(), "setting JOB_VOLUME_PATH is required") }) } From 5bd33acaa924a4426259121f3c5cf5530d705570 Mon Sep 17 00:00:00 2001 From: Carolyn Van Slyck Date: Tue, 2 Feb 2021 14:53:46 -0600 Subject: [PATCH 7/7] Review feedback Signed-off-by: Carolyn Van Slyck --- driver/kubernetes/kubernetes.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/driver/kubernetes/kubernetes.go b/driver/kubernetes/kubernetes.go index 103413b7..53703c17 100644 --- a/driver/kubernetes/kubernetes.go +++ b/driver/kubernetes/kubernetes.go @@ -75,7 +75,7 @@ type Driver struct { // Set to zero to not use a limit. Defaults to zero. LimitCPU resource.Quantity - // LimitMemory is amount of memory to request and the limit for the bundle's job. + // LimitMemory is the amount of memory to request and the limit for the bundle's job. // Set to zero to not use a limit. Defaults to zero. LimitMemory resource.Quantity @@ -261,7 +261,7 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { } const sharedVolumeName = "cnab-driver-share" - err = k.initJobVolumes(err) + err = k.initJobVolumes() if err != nil { return driver.OperationResult{}, err } @@ -429,11 +429,10 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) { return opResult, opErr.ErrorOrNil() } -func (k *Driver) initJobVolumes(err error) error { - // Store all job input files in ./inputs and outputs in ./outputs on the shared volume - +// Store all job input files in ./inputs and outputs in ./outputs on the shared volume +func (k *Driver) initJobVolumes() error { inputsDir := filepath.Join(k.JobVolumePath, "inputs") - err = os.Mkdir(inputsDir, 0700) + err := os.Mkdir(inputsDir, 0700) if err != nil && !os.IsExist(err) { return errors.Wrapf(err, "error creating inputs directory %s on shared job volume %s", inputsDir, k.JobVolumeName) }