Skip to content

Commit

Permalink
feat: add external job-delegation for vcn
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Kotzbauer <[email protected]>
  • Loading branch information
ckotzbauer committed Apr 25, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 42a9238 commit e0acaea
Showing 10 changed files with 420 additions and 38 deletions.
3 changes: 3 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
@@ -18,4 +18,7 @@ var (
ConfigKeyDependencyTrackBaseUrl = "dtrack-base-url"
ConfigKeyDependencyTrackApiKey = "dtrack-api-key"
ConfigKeyKubernetesClusterId = "kubernetes-cluster-id"
ConfigKeyJobImage = "job-image"
ConfigKeyJobImagePullSecret = "job-image-pull-secret"
ConfigKeyJobTimeout = "job-timeout"
)
48 changes: 42 additions & 6 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"time"

"github.com/ckotzbauer/sbom-operator/internal"
"github.com/ckotzbauer/sbom-operator/internal/job"
"github.com/ckotzbauer/sbom-operator/internal/kubernetes"
"github.com/ckotzbauer/sbom-operator/internal/syft"
"github.com/ckotzbauer/sbom-operator/internal/target"
@@ -24,9 +25,13 @@ func Start(cronTime string) {
targetKeys := viper.GetStringSlice(internal.ConfigKeyTargets)

logrus.Debugf("Cron set to: %v", cr)
logrus.Debugf("Targets set to: %v", targetKeys)
targets := make([]target.Target, 0)

if !hasJobImage() {
logrus.Debugf("Targets set to: %v", targetKeys)
targets = initTargets(targetKeys)
}

targets := initTargets(targetKeys)
cs := CronService{cron: cr, targets: targets}
cs.printNextExecution()

@@ -55,15 +60,29 @@ func (c *CronService) runBackgroundService() {
logrus.Info("Execute background-service")
format := viper.GetString(internal.ConfigKeyFormat)

for _, t := range c.targets {
t.Initialize()
if !hasJobImage() {
for _, t := range c.targets {
t.Initialize()
}
}

k8s := kubernetes.NewClient()
namespaces := k8s.ListNamespaces(viper.GetString(internal.ConfigKeyNamespaceLabelSelector))
logrus.Debugf("Discovered %v namespaces", len(namespaces))
containerImages, allImages := k8s.LoadImageInfos(namespaces, viper.GetString(internal.ConfigKeyPodLabelSelector))

if !hasJobImage() {
c.executeSyftScans(format, k8s, containerImages, allImages)
} else {
executeJobImage(k8s, containerImages)
}

c.printNextExecution()
running = false
}

func (c *CronService) executeSyftScans(format string, k8s *kubernetes.KubeClient,
containerImages map[string]kubernetes.ContainerImage, allImages []kubernetes.ContainerImage) {
sy := syft.New(format)

for _, image := range containerImages {
@@ -90,9 +109,22 @@ func (c *CronService) runBackgroundService() {
for _, t := range c.targets {
t.Cleanup(allImages)
}
}

c.printNextExecution()
running = false
func executeJobImage(k8s *kubernetes.KubeClient, containerImages map[string]kubernetes.ContainerImage) {
j, err := job.StartJob(k8s, containerImages)
if err != nil {
// Already handled from job-module
return
}

if job.WaitForJob(k8s, j) {
for _, i := range containerImages {
for _, pod := range i.Pods {
k8s.UpdatePodAnnotation(pod)
}
}
}
}

func initTargets(targetKeys []string) []target.Target {
@@ -124,3 +156,7 @@ func initTargets(targetKeys []string) []target.Target {

return targets
}

func hasJobImage() bool {
return viper.GetString(internal.ConfigKeyJobImage) != ""
}
113 changes: 113 additions & 0 deletions internal/job/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package job

import (
"context"
"encoding/json"
"os"
"strings"
"time"

batchv1 "k8s.io/api/batch/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ckotzbauer/sbom-operator/internal"
"github.com/ckotzbauer/sbom-operator/internal/kubernetes"
"github.com/ckotzbauer/sbom-operator/internal/registry"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

type imageConfig struct {
Host string `json:"registry-host"`
User string `json:"registry-user"`
Password string `json:"registry-password"`
Image string `json:"image"`
}

func StartJob(k8s *kubernetes.KubeClient, images map[string]kubernetes.ContainerImage) (*batchv1.Job, error) {
configs := make([]imageConfig, 0)
jobImage := viper.GetString(internal.ConfigKeyJobImage)
jobPullSecret := viper.GetString(internal.ConfigKeyJobImagePullSecret)
jobTimeout := viper.GetInt64(internal.ConfigKeyJobTimeout)
podNamespace := os.Getenv("POD_NAMESPACE")

for _, image := range images {
cfg, err := registry.ResolveAuthConfig(image)
if err != nil {
logrus.WithError(err).Error("Error occurred during auth-resolve")
return nil, err
}

configs = append(configs, imageConfig{
Host: cfg.ServerAddress,
User: cfg.Username,
Password: cfg.Password,
Image: image.ImageID,
})
}

bytes, err := json.Marshal(configs)
if err != nil {
logrus.WithError(err).Error("Error occurred during config-marshal")
return nil, err
}

suffix := generateObjectSuffix()
err = k8s.CreateJobSecret(podNamespace, suffix, bytes)
if err != nil {
logrus.WithError(err).Error("Error occurred during job-secret creation/update")
return nil, err
}

job, err := k8s.CreateJob(podNamespace, suffix, jobImage, jobPullSecret, jobTimeout, getJobEnvs())
if err != nil {
logrus.WithError(err).Error("Error occurred during job creation/update")
return nil, err
}

logrus.Infof("Created job %s-%s", kubernetes.JobName, suffix)
return job, nil
}

func WaitForJob(k8s *kubernetes.KubeClient, job *batchv1.Job) bool {
for {
job, err := k8s.Client.BatchV1().Jobs(job.Namespace).Get(context.Background(), job.Name, meta.GetOptions{})
if err != nil {
logrus.WithError(err).Warnf("Error while waiting for job %s.", job.Name)
return false
}

pending := job.Status.Active == 0 && job.Status.Succeeded == 0 && job.Status.Failed == 0
succeeded := job.Status.Active == 0 && job.Status.Succeeded > 0
failed := job.Status.Active == 0 && job.Status.Failed > 0

if !pending && succeeded {
logrus.Infof("Job succeeded %s", job.Name)
return true
} else if !pending && failed {
logrus.Infof("Job failed %s", job.Name)
return false
}

time.Sleep(10 * time.Second)
}
}

func generateObjectSuffix() string {
t := time.Now()
return t.Format("20060102-150405")
}

func getJobEnvs() map[string]string {
m := make(map[string]string)

for _, v := range os.Environ() {
splitted := strings.Split(v, "=")
if strings.HasPrefix(splitted[0], "SBOM_JOB_") {
key := strings.Replace(splitted[0], "SBOM_JOB_", "", 1)
m[key] = splitted[1]
}
}

return m
}
106 changes: 106 additions & 0 deletions internal/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -3,13 +3,16 @@ package kubernetes
import (
"context"
"fmt"
"os"
"strings"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

@@ -31,6 +34,8 @@ type KubeClient struct {

var (
annotationTemplate = "ckotzbauer.sbom-operator.io/%s"
jobSecretName = "sbom-operator-job-config"
JobName = "sbom-operator-job"
)

func NewClient() *KubeClient {
@@ -217,3 +222,104 @@ func (client *KubeClient) loadSecrets(namespace string, secrets []corev1.LocalOb

return nil, false, nil
}

func (client *KubeClient) CreateJobSecret(namespace, suffix string, data []byte) error {
m := make(map[string][]byte)
m["image-config.json"] = data
vTrue := true
vFalse := false

s := &corev1.Secret{
ObjectMeta: meta.ObjectMeta{
Namespace: namespace,
Name: fmt.Sprintf("%s-%s", jobSecretName, suffix),
OwnerReferences: []meta.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: os.Getenv("POD_NAME"),
UID: types.UID(os.Getenv("POD_UID")),
BlockOwnerDeletion: &vTrue,
Controller: &vFalse,
},
},
},
Data: m,
}

_, err := client.Client.CoreV1().Secrets(namespace).Create(context.Background(), s, meta.CreateOptions{})
return err
}

func (client *KubeClient) CreateJob(namespace, suffix, image, pullSecrets string, timeout int64, envs map[string]string) (*batchv1.Job, error) {
backoffLimit := int32(0)
vTrue := true
vFalse := false

j := &batchv1.Job{
ObjectMeta: meta.ObjectMeta{
Namespace: namespace,
Name: fmt.Sprintf("%s-%s", JobName, suffix),
OwnerReferences: []meta.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: os.Getenv("POD_NAME"),
UID: types.UID(os.Getenv("POD_UID")),
BlockOwnerDeletion: &vTrue,
Controller: &vFalse,
},
},
},
Spec: batchv1.JobSpec{
BackoffLimit: &backoffLimit,
ActiveDeadlineSeconds: &timeout,
Template: corev1.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Name: JobName,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "sbom",
Image: image,
Env: mapToEnvVars(envs),
SecurityContext: &corev1.SecurityContext{
Privileged: &vTrue,
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "config",
MountPath: "/sbom",
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
ImagePullSecrets: []corev1.LocalObjectReference{{Name: pullSecrets}},
Volumes: []corev1.Volume{
{
Name: "config",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: fmt.Sprintf("%s-%s", jobSecretName, suffix),
},
},
},
},
},
},
},
}

return client.Client.BatchV1().Jobs(namespace).Create(context.Background(), j, meta.CreateOptions{})
}

func mapToEnvVars(m map[string]string) []corev1.EnvVar {
vars := make([]corev1.EnvVar, 0)
for k, v := range m {
vars = append(vars, corev1.EnvVar{Name: k, Value: v})
}

return vars
}
Loading

0 comments on commit e0acaea

Please sign in to comment.