Skip to content

Commit

Permalink
E2E Task Priority
Browse files Browse the repository at this point in the history
  • Loading branch information
Rajadeepan D Ramesh committed Jul 24, 2019
1 parent 24527ea commit 8f17231
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 5 deletions.
42 changes: 42 additions & 0 deletions test/e2e/job_error_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package e2e
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"strconv"

"k8s.io/api/core/v1"

Expand Down Expand Up @@ -597,4 +598,45 @@ var _ = Describe("Job Error Handling", func() {
Expect(err).NotTo(HaveOccurred())
})

It("Task Priority", func() {
By("init test context")
context := initTestContext()
defer cleanupTestContext(context)

rep := clusterSize(context, oneCPU)
nodecount := clusterNodeNumber(context)
By("create job")
job := createJob(context, &jobSpec{
name: "task-priority-job",
min: int32(nodecount),
tasks: []taskSpec{
{
name: "higherprioritytask",
img: defaultNginxImage,
rep: int32(nodecount),
req: cpuResource(strconv.Itoa(int(rep)/nodecount - 1)),
taskpriority: masterPriority,
},
{
name: "lowerprioritytask",
img: defaultNginxImage,
rep: int32(nodecount),
req: cpuResource(strconv.Itoa(int(rep)/nodecount - 1)),
taskpriority: workerPriority,
},
},
})

// job phase: pending -> running
err := waitJobPhases(context, job, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running})
Expect(err).NotTo(HaveOccurred())
expteced := map[string]int{
masterPriority: int(nodecount),
workerPriority: 0,
}

err = waitTasksReadyEx(context, job, expteced)
Expect(err).NotTo(HaveOccurred())
})

})
51 changes: 46 additions & 5 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ type taskSpec struct {
restartPolicy v1.RestartPolicy
tolerations []v1.Toleration
defaultGracefulPeriod *int64
taskpriority string
}

type jobSpec struct {
Expand Down Expand Up @@ -343,11 +344,12 @@ func createJobInner(context *context, jobSpec *jobSpec) (*vkv1.Job, error) {
Labels: task.labels,
},
Spec: v1.PodSpec{
SchedulerName: "volcano",
RestartPolicy: restartPolicy,
Containers: createContainers(task.img, task.command, task.workingDir, task.req, task.limit, task.hostport),
Affinity: task.affinity,
Tolerations: task.tolerations,
SchedulerName: "volcano",
RestartPolicy: restartPolicy,
Containers: createContainers(task.img, task.command, task.workingDir, task.req, task.limit, task.hostport),
Affinity: task.affinity,
Tolerations: task.tolerations,
PriorityClassName: task.taskpriority,
},
},
}
Expand Down Expand Up @@ -410,6 +412,41 @@ func waitTaskPhase(ctx *context, job *vkv1.Job, phase []v1.PodPhase, taskNum int
return err
}

func taskPhaseEx(ctx *context, job *vkv1.Job, phase []v1.PodPhase, taskNum map[string]int) error {
err := wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) {

pods, err := ctx.kubeclient.CoreV1().Pods(job.Namespace).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

readyTaskNum := map[string]int{}
for _, pod := range pods.Items {
if !metav1.IsControlledBy(&pod, job) {
continue
}

for _, p := range phase {
if pod.Status.Phase == p {
readyTaskNum[pod.Spec.PriorityClassName]++
break
}
}
}

for k, v := range taskNum {
if v > readyTaskNum[k] {
return false, nil
}
}

return true, nil
})
if err != nil && strings.Contains(err.Error(), timeOutMessage) {
return fmt.Errorf("[Wait time out]")
}
return err

}

func jobUnschedulable(ctx *context, job *vkv1.Job, now time.Time) error {
var additionalError error
// TODO(k82cn): check Job's Condition instead of PodGroup's event.
Expand Down Expand Up @@ -623,6 +660,10 @@ func waitTasksReady(ctx *context, job *vkv1.Job, taskNum int) error {
return waitTaskPhase(ctx, job, []v1.PodPhase{v1.PodRunning, v1.PodSucceeded}, taskNum)
}

func waitTasksReadyEx(ctx *context, job *vkv1.Job, taskNum map[string]int) error {
return taskPhaseEx(ctx, job, []v1.PodPhase{v1.PodRunning, v1.PodSucceeded}, taskNum)
}

func waitTasksPending(ctx *context, job *vkv1.Job, taskNum int) error {
return waitTaskPhase(ctx, job, []v1.PodPhase{v1.PodPending}, taskNum)
}
Expand Down

0 comments on commit 8f17231

Please sign in to comment.