Skip to content

Commit

Permalink
runConfig: added task timeout in global, run and task configuration. …
Browse files Browse the repository at this point in the history
…executor: added tasksTimeoutCleaner for managing task timeout. scheduler: updated executorTaskCleaner for managing task timeout and report it to the executor
  • Loading branch information
alessandro-sorint committed Apr 13, 2022
1 parent 7b39dc8 commit a6a8d61
Show file tree
Hide file tree
Showing 13 changed files with 390 additions and 7 deletions.
4 changes: 4 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"regexp"
"strings"
"time"

"agola.io/agola/internal/errors"
itypes "agola.io/agola/internal/services/types"
Expand Down Expand Up @@ -55,6 +56,7 @@ type Config struct {
Runs []*Run `json:"runs"`

DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeout *time.Duration `json:"task_timeout"`
}

type RuntimeType string
Expand Down Expand Up @@ -113,6 +115,7 @@ type Run struct {
Tasks []*Task `json:"tasks"`
When *When `json:"when"`
DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeout *time.Duration `json:"task_timeout"`
}

type Task struct {
Expand All @@ -128,6 +131,7 @@ type Task struct {
Approval bool `json:"approval"`
When *When `json:"when"`
DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeout *time.Duration `json:"task_timeout"`
}

type DependCondition string
Expand Down
3 changes: 2 additions & 1 deletion internal/migration/runservice/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,11 @@ const (
RunTaskStatusStopped RunTaskStatus = "stopped"
RunTaskStatusSuccess RunTaskStatus = "success"
RunTaskStatusFailed RunTaskStatus = "failed"
RunTaskStatusTimeout RunTaskStatus = "timeout"
)

func (s RunTaskStatus) IsFinished() bool {
return s == RunTaskStatusCancelled || s == RunTaskStatusSkipped || s == RunTaskStatusStopped || s == RunTaskStatusSuccess || s == RunTaskStatusFailed
return s == RunTaskStatusCancelled || s == RunTaskStatusSkipped || s == RunTaskStatusStopped || s == RunTaskStatusSuccess || s == RunTaskStatusFailed || s == RunTaskStatusTimeout
}

type RunTaskFetchPhase string
Expand Down
14 changes: 14 additions & 0 deletions internal/runconfig/runconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,20 @@ func GenRunConfigTasks(uuid util.UUIDGenerator, c *config.Config, runName string
}
}

if c.TaskTimeout != nil {
t.TaskTimeout = *c.TaskTimeout
}

// override with per run task timeout
if cr.TaskTimeout != nil {
t.TaskTimeout = *cr.TaskTimeout
}

// override with per task timeout
if ct.TaskTimeout != nil {
t.TaskTimeout = *ct.TaskTimeout
}

rcts[t.ID] = t
}

Expand Down
157 changes: 157 additions & 0 deletions internal/runconfig/runconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"testing"
"time"

"agola.io/agola/internal/config"
"agola.io/agola/internal/errors"
Expand Down Expand Up @@ -1358,6 +1359,162 @@ func TestGenRunConfig(t *testing.T) {
},
},
},
{
name: "test runconfig generation task timeout global",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
},
},
TaskTimeout: util.DurationP(10 * time.Second),
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeout: 10 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
{
name: "test global task timeout override by run",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
TaskTimeout: util.DurationP(15 * time.Second),
},
},
TaskTimeout: util.DurationP(10 * time.Second),
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeout: 15 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
{
name: "test global task timeout override by task",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
TaskTimeout: util.DurationP(20 * time.Second),
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
TaskTimeout: util.DurationP(15 * time.Second),
},
},
TaskTimeout: util.DurationP(10 * time.Second),
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeout: 20 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
}

for _, tt := range tests {
Expand Down
65 changes: 62 additions & 3 deletions internal/services/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,9 @@ func (e *Executor) executeTask(rt *runningTask) {
rt.Lock()
if err != nil {
e.log.Err(err).Send()
if rt.et.Spec.Stop {
if rt.et.Spec.Timeout {
et.Status.Phase = types.ExecutorTaskPhaseTimeout
} else if rt.et.Spec.Stop {
et.Status.Phase = types.ExecutorTaskPhaseStopped
} else {
et.Status.Phase = types.ExecutorTaskPhaseFailed
Expand Down Expand Up @@ -956,14 +958,14 @@ func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod dr
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseSuccess

if err != nil {
if rt.et.Spec.Stop {
if rt.et.Spec.Stop || rt.et.Spec.Timeout {
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseStopped
} else {
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed
}
serr = errors.Wrapf(err, "failed to execute step %s", util.Dump(step))
} else if exitCode != 0 {
if rt.et.Spec.Stop {
if rt.et.Spec.Stop || rt.et.Spec.Timeout {
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseStopped
} else {
rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed
Expand Down Expand Up @@ -1156,10 +1158,21 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
if rt != nil {
rt.Lock()
// update running task Spec.Stop value only when there's a transitions from false to true,
// update running task Spec.Timeout value only when there's a transitions from false to true,
// other spec values cannot change once the task has been scheduled
if !rt.et.Spec.Stop && et.Spec.Stop {
rt.et.Spec.Stop = et.Spec.Stop

// cancel the running task
rt.cancel()
} else if !rt.et.Spec.Timeout && et.Spec.Timeout {
rt.et.Spec.Timeout = et.Spec.Timeout
rt.et.Status.Phase = types.ExecutorTaskPhaseTimeout
if et.Status.EndTime != nil {
rt.et.Status.EndTime = et.Status.EndTime
} else {
rt.et.Status.EndTime = util.TimeP(time.Now())
}
// cancel the running task
rt.cancel()
}
Expand Down Expand Up @@ -1481,6 +1494,7 @@ func (e *Executor) Run(ctx context.Context) error {
go e.podsCleanerLoop(ctx)
go e.tasksUpdaterLoop(ctx)
go e.tasksDataCleanerLoop(ctx)
go e.tasksTimeoutCleanerLoop(ctx)

go e.handleTasks(ctx, ch)

Expand Down Expand Up @@ -1510,3 +1524,48 @@ func (e *Executor) Run(ctx context.Context) error {

return nil
}

func (e *Executor) tasksTimeoutCleanerLoop(ctx context.Context) {
for {
e.log.Debug().Msgf("tasksTimeoutCleaner")

e.tasksTimeoutCleaner(ctx)

sleepCh := time.NewTimer(2 * time.Second).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}

func (e *Executor) tasksTimeoutCleaner(ctx context.Context) {
for _, rtID := range e.runningTasks.ids() {
if rt, ok := e.runningTasks.get(rtID); ok {
if rt.et.Status.Phase == types.ExecutorTaskPhaseRunning && rt.et.Spec.TaskTimeout != 0 && rt.et.Status.StartTime.Add(rt.et.Spec.TaskTimeout).Before(time.Now()) {
rt.et.Status.Phase = types.ExecutorTaskPhaseTimeout
rt.et.Status.EndTime = util.TimeP(time.Now())
rt.et.Status.FailError = "timeout"

if rt.et.Status.SetupStep.Phase == types.ExecutorTaskPhaseRunning {
rt.et.Status.SetupStep.Phase = types.ExecutorTaskPhaseFailed
rt.et.Status.SetupStep.EndTime = util.TimeP(time.Now())
}
for _, s := range rt.et.Status.Steps {
if s.Phase == types.ExecutorTaskPhaseRunning {
s.Phase = types.ExecutorTaskPhaseFailed
s.EndTime = util.TimeP(time.Now())
}
}

if err := e.sendExecutorTaskStatus(ctx, rt.et); err != nil {
e.log.Err(err).Send()
}

rt.cancel()
e.runningTasks.delete(rtID)
}
}
}
}
4 changes: 4 additions & 0 deletions internal/services/gateway/api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.Run

Level: rct.Level,
Depends: rct.Depends,

TaskTimeout: rct.TaskTimeout,
}

return t
Expand All @@ -95,6 +97,8 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwa

StartTime: rt.StartTime,
EndTime: rt.EndTime,

TaskTimeout: rct.TaskTimeout,
}

t.SetupStep = &gwapitypes.RunTaskResponseSetupStep{
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func GenExecutorTaskSpecData(r *types.Run, rt *types.RunTask, rc *types.RunConfi
Steps: rct.Steps,
CachePrefix: cachePrefix,
DockerRegistriesAuth: rct.DockerRegistriesAuth,
TaskTimeout: rct.TaskTimeout,
}

// calculate workspace operations
Expand Down
Loading

0 comments on commit a6a8d61

Please sign in to comment.