Skip to content

Commit

Permalink
Combine input & output volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyLike committed May 5, 2019
1 parent 072b95d commit fa9fcc4
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 111 deletions.
7 changes: 3 additions & 4 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
}
}

//TODO(tommylikehu): Fix me and enable it.
//if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok {
// msg = msg + validateInfo
//}
if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok {
msg = msg + validateInfo
}

if msg != "" {
reviewResponse.Allowed = false
Expand Down
35 changes: 2 additions & 33 deletions pkg/admission/mutate_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ package admission
import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"time"

"github.com/golang/glog"
"strconv"

"k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)

type patchOperation struct {
Expand Down Expand Up @@ -74,7 +71,6 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
func createPatch(job v1alpha1.Job) ([]byte, error) {
var patch []patchOperation
patch = append(patch, mutateSpec(job.Spec.Tasks, "/spec/tasks")...)
patch = append(patch, mutateMetadata(job.ObjectMeta, "/metadata")...)

return json.Marshal(patch)
}
Expand All @@ -95,30 +91,3 @@ func mutateSpec(tasks []v1alpha1.TaskSpec, basePath string) (patch []patchOperat

return patch
}

func mutateMetadata(metadata metav1.ObjectMeta, basePath string) (patch []patchOperation) {
if len(metadata.Annotations) == 0 {
metadata.Annotations = make(map[string]string)
}
randomStr := genRandomStr(5)
metadata.Annotations[PVCInputName] = fmt.Sprintf("%s-input-%s", metadata.Name, randomStr)
metadata.Annotations[PVCOutputName] = fmt.Sprintf("%s-output-%s", metadata.Name, randomStr)
patch = append(patch, patchOperation{
Op: "replace",
Path: basePath,
Value: metadata,
})

return patch
}

func genRandomStr(l int) string {
str := "0123456789abcdefghijklmnopqrstuvwxyz"
bytes := []byte(str)
result := []byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < l; i++ {
result = append(result, bytes[r.Intn(len(bytes))])
}
return string(result)
}
22 changes: 11 additions & 11 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,29 @@ type JobSpec struct {
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"`

// The volume mount for input of Job
Input *VolumeSpec `json:"input,omitempty" protobuf:"bytes,3,opt,name=input"`

// The volume mount for output of Job
Output *VolumeSpec `json:"output,omitempty" protobuf:"bytes,4,opt,name=output"`
// The volumes mount on Job
Volumes []VolumeSpec `json:"volumes,omitempty" protobuf:"bytes,3,opt,name=volumes"`

// Tasks specifies the task specification of Job
// +optional
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,5,opt,name=tasks"`
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,4,opt,name=tasks"`

// Specifies the default lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,6,opt,name=policies"`
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"`

// Specifies the plugin of job
// Key is plugin name, value is the arguments of the plugin
// +optional
Plugins map[string][]string `json:"plugins,omitempty" protobuf:"bytes,7,opt,name=plugins"`
Plugins map[string][]string `json:"plugins,omitempty" protobuf:"bytes,6,opt,name=plugins"`

//Specifies the queue that will be used in the scheduler, "default" queue is used this leaves empty.
Queue string `json:"queue,omitempty" protobuf:"bytes,8,opt,name=queue"`
Queue string `json:"queue,omitempty" protobuf:"bytes,7,opt,name=queue"`

// Specifies the maximum number of retries before marking this Job failed.
// Defaults to 3.
// +optional
MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,9,opt,name=maxRetry"`
MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,8,opt,name=maxRetry"`
}

// VolumeSpec defines the specification of Volume, e.g. PVC
Expand All @@ -81,8 +78,11 @@ type VolumeSpec struct {
// not contain ':'.
MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`

// defined the PVC name
VolumeClaimName string `json:"volumeClaimName,omitempty" protobuf:"bytes,2,opt,name=volumeClaimName"`

// VolumeClaim defines the PVC used by the VolumeMount.
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,1,opt,name=volumeClaim"`
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,3,opt,name=volumeClaim"`
}

type JobEvent string
Expand Down
15 changes: 6 additions & 9 deletions pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion pkg/controllers/job/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package helpers

import (
"fmt"
"math/rand"
"strings"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

const (
Expand All @@ -39,3 +41,14 @@ func GetTaskIndex(pod *v1.Pod) string {
func MakePodName(jobName string, taskName string, index int) string {
return fmt.Sprintf(PodNameFmt, jobName, taskName, index)
}

func GenRandomStr(l int) string {
str := "0123456789abcdefghijklmnopqrstuvwxyz"
bytes := []byte(str)
result := []byte{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < l; i++ {
result = append(result, bytes[r.Intn(len(bytes))])
}
return string(result)
}
77 changes: 24 additions & 53 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

kbapi "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

admissioncontroller "volcano.sh/volcano/pkg/admission"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
"volcano.sh/volcano/pkg/controllers/apis"
Expand Down Expand Up @@ -54,7 +52,7 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod {

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: MakePodName(job.Name, template.Name, ix),
Name: vkjobhelpers.MakePodName(job.Name, template.Name, ix),
Namespace: job.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
Expand All @@ -70,61 +68,34 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod {
pod.Spec.SchedulerName = job.Spec.SchedulerName
}

inputPVC := job.Annotations[admissioncontroller.PVCInputName]
outputPVC := job.Annotations[admissioncontroller.PVCOutputName]
if job.Spec.Output != nil {
if job.Spec.Output.VolumeClaim == nil {
volume := v1.Volume{
Name: outputPVC,
}
volume.EmptyDir = &v1.EmptyDirVolumeSource{}
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
} else {
volume := v1.Volume{
Name: outputPVC,
}
volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{
ClaimName: outputPVC,
}

pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
}

for i, c := range pod.Spec.Containers {
vm := v1.VolumeMount{
MountPath: job.Spec.Output.MountPath,
Name: outputPVC,
}
pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm)
}
}

if job.Spec.Input != nil {
if job.Spec.Input.VolumeClaim == nil {
volume := v1.Volume{
Name: inputPVC,
}
volume.EmptyDir = &v1.EmptyDirVolumeSource{}
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
} else {
volume := v1.Volume{
Name: inputPVC,
}
volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{
ClaimName: inputPVC,
volumeMap := make(map[string]bool)
for _, volume := range job.Spec.Volumes {
vcName := volume.VolumeClaimName
if _, ok := volumeMap[vcName]; !ok {
if _, ok := job.Status.ControlledResources["volume-emptyDir-"+vcName]; ok && volume.VolumeClaim == nil {
volume := v1.Volume{
Name: vcName,
}
volume.EmptyDir = &v1.EmptyDirVolumeSource{}
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
} else {
volume := v1.Volume{
Name: vcName,
}
volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{
ClaimName: vcName,
}
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
}

pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
volumeMap[vcName] = true
}

for i, c := range pod.Spec.Containers {
vm := v1.VolumeMount{
MountPath: job.Spec.Input.MountPath,
Name: inputPVC,
MountPath: volume.MountPath,
Name: vcName,
}

pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm)

}
}

Expand Down

0 comments on commit fa9fcc4

Please sign in to comment.