Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runservice: use all scheduled tasks in scheduleRun #220

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 14 additions & 29 deletions internal/services/runservice/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,6 @@ const (
defaultExecutorNotAliveInterval = 60 * time.Second
)

func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) {
// the real source of active tasks is the number of executor tasks in etcd
// we can't rely on RunTask.Status since it's only updated when receiveing
// updated from the executor so it could be in a NotStarted state but have an
// executor tasks scheduled and running
ets, err := store.GetExecutorTasksForRun(ctx, s.e, runID)
if err != nil {
return nil, err
}
activeTasks := []*types.ExecutorTask{}
for _, et := range ets {
if !et.Status.Phase.IsFinished() {
activeTasks = append(activeTasks, et)
}
}

return activeTasks, nil
}

func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r *types.Run, rc *types.RunConfig) bool {
rct := rc.Tasks[rt.ID]
parents := runconfig.GetParents(rc.Tasks, rct)
Expand Down Expand Up @@ -96,7 +77,7 @@ func taskMatchesParentDependCondition(ctx context.Context, rt *types.RunTask, r
return len(parents) == matchedNum
}

func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) (*types.Run, error) {
func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig, scheduledExecutorTasks []*types.ExecutorTask) (*types.Run, error) {
log.Debugf("run: %s", util.Dump(curRun))
log.Debugf("rc: %s", util.Dump(rc))

Expand All @@ -107,7 +88,7 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig
// if the run is set to stop, skip all not running tasks
for _, rt := range newRun.Tasks {
isScheduled := false
for _, et := range activeExecutorTasks {
for _, et := range scheduledExecutorTasks {
if rt.ID == et.ID {
isScheduled = true
}
Expand Down Expand Up @@ -139,7 +120,7 @@ func advanceRunTasks(ctx context.Context, curRun *types.Run, rc *types.RunConfig
// cancel task if the run has a result set and is not yet scheduled
if curRun.Result.IsSet() {
isScheduled := false
for _, et := range activeExecutorTasks {
for _, et := range scheduledExecutorTasks {
if rt.ID == et.ID {
isScheduled = true
}
Expand Down Expand Up @@ -480,12 +461,16 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru
prevPhase := r.Phase
prevResult := r.Result

activeExecutorTasks, err := s.runActiveExecutorTasks(ctx, r.ID)
// the real source of active tasks is the number of executor tasks in etcd
// we can't rely on RunTask.Status since it's only updated when receiveing
// updated from the executor so it could be in a NotStarted state but have an
// executor tasks scheduled and running
scheduledExecutorTasks, err := store.GetExecutorTasksForRun(ctx, s.e, r.ID)
if err != nil {
return err
}

if err := advanceRun(ctx, r, rc, activeExecutorTasks); err != nil {
if err := advanceRun(ctx, r, rc, scheduledExecutorTasks); err != nil {
return err
}

Expand All @@ -506,7 +491,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru

// if the run is set to stop, stop all active tasks
if r.Stop {
for _, et := range activeExecutorTasks {
for _, et := range scheduledExecutorTasks {
et.Spec.Stop = true
if _, err := store.AtomicPutExecutorTask(ctx, s.e, et); err != nil {
return err
Expand All @@ -519,7 +504,7 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru

// advance tasks
if r.Phase == types.RunPhaseRunning {
r, err := advanceRunTasks(ctx, r, rc, activeExecutorTasks)
r, err := advanceRunTasks(ctx, r, rc, scheduledExecutorTasks)
if err != nil {
return err
}
Expand All @@ -541,9 +526,9 @@ func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.Ru

// advanceRun updates the run result and phase. It must be the unique function that
// should update them.
func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeExecutorTasks []*types.ExecutorTask) error {
func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, scheduledExecutorTasks []*types.ExecutorTask) error {
log.Debugf("run: %s", util.Dump(r))
hasActiveTasks := len(activeExecutorTasks) > 0
hasScheduledTasks := len(scheduledExecutorTasks) > 0

// fail run if a task is failed
if !r.Result.IsSet() && r.Phase == types.RunPhaseRunning {
Expand Down Expand Up @@ -595,7 +580,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx
}

if finished && !r.Phase.IsFinished() {
if !hasActiveTasks {
if !hasScheduledTasks {
r.ChangePhase(types.RunPhaseFinished)
}
}
Expand Down
16 changes: 8 additions & 8 deletions internal/services/runservice/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ func TestAdvanceRunTasks(t *testing.T) {
}

tests := []struct {
name string
rc *types.RunConfig
r *types.Run
activeExecutorTasks []*types.ExecutorTask
out *types.Run
name string
rc *types.RunConfig
r *types.Run
scheduledExecutorTasks []*types.ExecutorTask
out *types.Run
}{
{
name: "test top level task not started",
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestAdvanceRunTasks(t *testing.T) {
run.Tasks["task04"].Status = types.RunTaskStatusSuccess
return run
}(),
activeExecutorTasks: []*types.ExecutorTask{
scheduledExecutorTasks: []*types.ExecutorTask{
&types.ExecutorTask{ID: "task01"},
},
out: func() *types.Run {
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestAdvanceRunTasks(t *testing.T) {
run.Stop = true
return run
}(),
activeExecutorTasks: []*types.ExecutorTask{
scheduledExecutorTasks: []*types.ExecutorTask{
&types.ExecutorTask{ID: "task01"},
},
out: func() *types.Run {
Expand All @@ -390,7 +390,7 @@ func TestAdvanceRunTasks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.activeExecutorTasks)
r, err := advanceRunTasks(ctx, tt.r, tt.rc, tt.scheduledExecutorTasks)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down