Skip to content

Commit

Permalink
Improve defaulting for kubernetes driver
Browse files Browse the repository at this point in the history
* When LimitCPU or LimitMemory is 0, do not set a limit. We don't set a
request size, and requesting 0 is not the same as not having a requested
resource quantity. This can lead to increased pod evictions.
* Clarify that LimitCPU and LimitMemory are also used for the requested
resources.
* When ActiveDeadlineSeconds is 0, do not set a deadline.
* Stop defaulting ActiveDeadlineSeconds to 5 minutes. It will cut off a
bundle mid-execution, which isn't a great default.
* Split reading the configuration in SetConfig and attempting to connect
to the cluster, so that we can unit test the settings logic.
* Move validation for KUBE_NAMESPACE into SetConfig so that any errors
can be reported immediately.
* Add comments for all of the driver settings

Signed-off-by: Carolyn Van Slyck <[email protected]>
  • Loading branch information
carolynvs committed Feb 1, 2021
1 parent b6af7cb commit 5e72b5a
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 85 deletions.
186 changes: 145 additions & 41 deletions driver/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
SettingKubeNamespace = "KUBE_NAMESPACE"
SettingServiceAccount = "SERVICE_ACCOUNT"
SettingKubeconfig = "KUBECONFIG"
SettingMasterUrl = "MASTER_URL"
SettingMasterURL = "MASTER_URL"
)

var (
Expand All @@ -55,23 +55,77 @@ var (

// Driver runs an invocation image in a Kubernetes cluster.
type Driver struct {
Namespace string
ServiceAccountName string
Annotations map[string]string
Labels []string
LimitCPU resource.Quantity
LimitMemory resource.Quantity
JobVolumePath string
JobVolumeName string
Tolerations []v1.Toleration
// Namespace where the bundle's job should be executed. Required.
Namespace string

// ServiceAccountName is the name of the ServiceAccount under which the
// bundle's job should be executed. Leave blank to execute as the default
// ServiceAccount of the namespace.
ServiceAccountName string

// Annotations that should be applied to any Kubernetes resources created
// by the driver.
Annotations map[string]string

// Labels that should be applied to any Kubernetes resources created
// by the driver.
Labels []string

// LimitCPU is the amount of CPU to request and the limit for the bundle's job.
// Set to zero to not use a limit. Defaults to zero.
LimitCPU resource.Quantity

// LimitMemory is amount of memory to request and the limit for the bundle's job.
// Set to zero to not use a limit. Defaults to zero.
LimitMemory resource.Quantity

// JobVolumePath is the local path where the a persistent volume is mounted to share
// data between the driver and the bundle.
JobVolumePath string

// JobVolumeName is the name of the persistent volume claim that should be mounted
// to the bundle's pod to share data between the driver and the bundle.
//
// Files that should be injected into the bundle are stored in ./inputs and the
// directory ./outputs is mounted to /cnab/app/outputs to collect any bundle
// outputs generated.
JobVolumeName string

// Tolerations is an optional list of tolerations to apply to the bundle's job.
Tolerations []v1.Toleration

// ActiveDeadlineSeconds is the time limit for running the driver's
// execution, including retries. Set to 0 to not use a deadline. Default is
// 5 minutes.
//
// Setting this value to a non-zero value can cause bundles that would have
// been successful, or that have even completed successfully, to halt abruptly
// before the bundle's execution run can be recorded in claim storage.
ActiveDeadlineSeconds int64
BackoffLimit int32
SkipCleanup bool
skipJobStatusCheck bool
jobs batchclientv1.JobInterface
secrets coreclientv1.SecretInterface
pods coreclientv1.PodInterface
deletionPolicy metav1.DeletionPropagation

// BackoffLimit is the number of times to retry the driver's
// execution. Defaults to 0, so failed executions will not be retried.
BackoffLimit int32

// SkipCleanup specifies if the driver should remove any Kubernetes
// resources that it created when the driver execution completes.
SkipCleanup bool

// InCluster indicates if the driver should connect to the cluster using
// in-cluster environment variables.
InCluster bool

// Kubeconfig is the absolute path to the kubeconfig file.
Kubeconfig string

// MasterURL is the Kubernetes API endpoint.
MasterURL string

skipJobStatusCheck bool
jobs batchclientv1.JobInterface
secrets coreclientv1.SecretInterface
pods coreclientv1.PodInterface
deletionPolicy metav1.DeletionPropagation
}

// New initializes a Kubernetes driver.
Expand Down Expand Up @@ -101,14 +155,18 @@ func (k *Driver) Config() map[string]string {
SettingKubeNamespace: "Kubernetes namespace in which to run the invocation image",
SettingServiceAccount: "Kubernetes service account to be mounted by the invocation image (if empty, no service account token will be mounted)",
SettingKubeconfig: "Absolute path to the kubeconfig file",
SettingMasterUrl: "Kubernetes master endpoint",
SettingMasterURL: "Kubernetes master endpoint",
}
}

// SetConfig sets Kubernetes driver configuration.
func (k *Driver) SetConfig(settings map[string]string) error {
k.setDefaults()
k.Namespace = settings[SettingKubeNamespace]
if k.Namespace == "" {
return errors.Errorf("setting %s is required", SettingKubeNamespace)
}

k.ServiceAccountName = settings[SettingServiceAccount]
k.Labels = strings.Split(settings[SettingLabels], " ")

Expand All @@ -126,36 +184,59 @@ func (k *Driver) SetConfig(settings map[string]string) error {
k.SkipCleanup = !cleanup
}

var conf *rest.Config
if incluster, _ := strconv.ParseBool(settings[SettingInCluster]); incluster {
conf, err = rest.InClusterConfig()
if inClusterVal, ok := settings[SettingInCluster]; ok {
inCluster, err := strconv.ParseBool(inClusterVal)
if err != nil {
return errors.Wrap(err, "error retrieving in-cluster kubernetes configuration")
return errors.Wrapf(err, "invalid value %q for %s", inClusterVal, SettingInCluster)
}
} else {
k.InCluster = inCluster
}

if !k.InCluster {
var kubeconfig string
if kpath := settings[SettingKubeconfig]; kpath != "" {
kubeconfig = kpath
} else if home := homeDir(); home != "" {
kubeconfig = filepath.Join(home, ".kube", "config")
}

conf, err = clientcmd.BuildConfigFromFlags(settings[SettingMasterUrl], kubeconfig)
if err != nil {
return errors.Wrapf(err, "error retrieving external kubernetes configuration using configuration:\n%v", settings)
}
k.Kubeconfig = kubeconfig
k.MasterURL = settings[SettingMasterURL]
}

return k.setClient(conf)
return nil
}

func (k *Driver) setDefaults() {
k.SkipCleanup = false
k.BackoffLimit = 0
k.ActiveDeadlineSeconds = 300
k.ActiveDeadlineSeconds = 0 // Default to not cutting off a bundle mid-run
k.deletionPolicy = metav1.DeletePropagationBackground
}

func (k *Driver) initClient() error {
// Check if a test has already configured a client
if k.jobs != nil {
return nil
}

var conf *rest.Config
var err error
if k.InCluster {
conf, err = rest.InClusterConfig()
if err != nil {
return errors.Wrap(err, "error retrieving in-cluster kubernetes configuration")
}
} else {
conf, err = clientcmd.BuildConfigFromFlags(k.MasterURL, k.Kubeconfig)
if err != nil {
return errors.Wrapf(err, "error retrieving external kubernetes configuration for %s with kubeconfig %s", k.MasterURL, k.Kubeconfig)
}
}

return k.setClient(conf)
}

func (k *Driver) setClient(conf *rest.Config) error {
coreClient, err := coreclientv1.NewForConfig(conf)
if err != nil {
Expand All @@ -174,8 +255,9 @@ func (k *Driver) setClient(conf *rest.Config) error {

// Run executes the operation inside of the invocation image.
func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) {
if k.Namespace == "" {
return driver.OperationResult{}, fmt.Errorf("KUBE_NAMESPACE is required")
err := k.initClient()
if err != nil {
return driver.OperationResult{}, err
}

const sharedVolumeName = "cnab-driver-share"
Expand Down Expand Up @@ -207,7 +289,7 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) {
job := &batchv1.Job{
ObjectMeta: meta,
Spec: batchv1.JobSpec{
ActiveDeadlineSeconds: &k.ActiveDeadlineSeconds,
ActiveDeadlineSeconds: defaultInt64Ptr(k.ActiveDeadlineSeconds),
Completions: defaultInt32Ptr(1),
BackoffLimit: &k.BackoffLimit,
Template: v1.PodTemplateSpec{
Expand Down Expand Up @@ -241,15 +323,9 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) {
}

container := v1.Container{
Name: k8sContainerName,
Image: img,
Command: []string{"/cnab/app/run"},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: k.LimitCPU,
v1.ResourceMemory: k.LimitMemory,
},
},
Name: k8sContainerName,
Image: img,
Command: []string{"/cnab/app/run"},
ImagePullPolicy: v1.PullIfNotPresent,
VolumeMounts: []v1.VolumeMount{
{
Expand All @@ -260,6 +336,14 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) {
},
}

if !k.LimitCPU.IsZero() {
container.Resources.Limits[v1.ResourceCPU] = k.LimitCPU
}

if !k.LimitMemory.IsZero() {
container.Resources.Limits[v1.ResourceMemory] = k.LimitMemory
}

if len(op.Environment) > 0 {
secret := &v1.Secret{
ObjectMeta: meta,
Expand Down Expand Up @@ -363,6 +447,26 @@ func (k *Driver) initJobVolumes(err error) error {
return nil
}

// defaultInt64Ptr converts an integer value to a pointer, treating values less
// than or equal to zero as nil.
func defaultInt64Ptr(value int64) *int64 {
var ptr *int64
if value > 0 {
ptr = &value
}
return ptr
}

// defaultInt32Ptr converts an integer value to a pointer, treating values less
// than or equal to zero as nil.
func defaultInt32Ptr(value int32) *int32 {
var ptr *int32
if value > 0 {
ptr = &value
}
return ptr
}

// fetchOutputs collects any outputs created by the job that were persisted to JobVolumeName (which is mounted locally
// at JobVolumePath).
//
Expand Down
17 changes: 5 additions & 12 deletions driver/kubernetes/kubernetes_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,12 @@ func TestDriver_Run_Integration(t *testing.T) {
}
}

func TestDriver_SetConfig(t *testing.T) {
t.Run("cleanup_jobs", func(t *testing.T) {
d := Driver{}
d.SetConfig(map[string]string{
"CLEANUP_JOBS": "false",
})
assert.True(t, d.SkipCleanup)
})
func TestDriver_InitClient(t *testing.T) {
t.Run("kubeconfig", func(t *testing.T) {
d := Driver{}
err := d.SetConfig(map[string]string{
"KUBECONFIG": os.Getenv("KUBECONFIG"),
})
d := Driver{
Kubeconfig: os.Getenv("KUBECONFIG"),
}
err := d.initClient()
require.NoError(t, err)
})
}
Loading

0 comments on commit 5e72b5a

Please sign in to comment.