Skip to content

Commit

Permalink
runConfig: added task timeout in global, run and task configuration. …
Browse files Browse the repository at this point in the history
…executor: added tasksTimeoutCleaner for managing task timeout. scheduler: updated executorTaskCleaner for managing task timeout and report it to the executor
  • Loading branch information
alessandro-sorint committed Jun 6, 2022
1 parent ae347ab commit 0810247
Show file tree
Hide file tree
Showing 13 changed files with 565 additions and 5 deletions.
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Config struct {
Runs []*Run `json:"runs"`

DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"`
}

type RuntimeType string
Expand Down Expand Up @@ -113,6 +114,7 @@ type Run struct {
Tasks []*Task `json:"tasks"`
When *When `json:"when"`
DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"`
}

type Task struct {
Expand All @@ -128,6 +130,7 @@ type Task struct {
Approval bool `json:"approval"`
When *When `json:"when"`
DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"`
}

type DependCondition string
Expand Down
3 changes: 2 additions & 1 deletion internal/migration/runservice/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,11 @@ const (
RunTaskStatusStopped RunTaskStatus = "stopped"
RunTaskStatusSuccess RunTaskStatus = "success"
RunTaskStatusFailed RunTaskStatus = "failed"
RunTaskStatusTimedout RunTaskStatus = "timedout"
)

func (s RunTaskStatus) IsFinished() bool {
return s == RunTaskStatusCancelled || s == RunTaskStatusSkipped || s == RunTaskStatusStopped || s == RunTaskStatusSuccess || s == RunTaskStatusFailed
return s == RunTaskStatusCancelled || s == RunTaskStatusSkipped || s == RunTaskStatusStopped || s == RunTaskStatusSuccess || s == RunTaskStatusFailed || s == RunTaskStatusTimedout
}

type RunTaskFetchPhase string
Expand Down
14 changes: 14 additions & 0 deletions internal/runconfig/runconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,20 @@ func GenRunConfigTasks(uuid util.UUIDGenerator, c *config.Config, runName string
}
}

if c.TaskTimeoutInterval != nil {
t.TaskTimeoutInterval = c.TaskTimeoutInterval.Duration
}

// override with per run task timeout
if cr.TaskTimeoutInterval != nil {
t.TaskTimeoutInterval = cr.TaskTimeoutInterval.Duration
}

// override with per task timeout
if ct.TaskTimeoutInterval != nil {
t.TaskTimeoutInterval = ct.TaskTimeoutInterval.Duration
}

rcts[t.ID] = t
}

Expand Down
157 changes: 157 additions & 0 deletions internal/runconfig/runconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"testing"
"time"

"agola.io/agola/internal/config"
"agola.io/agola/internal/errors"
Expand Down Expand Up @@ -1358,6 +1359,162 @@ func TestGenRunConfig(t *testing.T) {
},
},
},
{
name: "test runconfig generation task timeout global",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
},
},
TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second},
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeoutInterval: 10 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
{
name: "test global task timeout override by run",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
TaskTimeoutInterval: &types.Duration{Duration: 15 * time.Second},
},
},
TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second},
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeoutInterval: 15 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
{
name: "test global task timeout override by task",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
TaskTimeoutInterval: &types.Duration{Duration: 20 * time.Second},
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
TaskTimeoutInterval: &types.Duration{Duration: 15 * time.Second},
},
},
TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second},
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeoutInterval: 20 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
}

for _, tt := range tests {
Expand Down
56 changes: 55 additions & 1 deletion internal/services/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,11 @@ func (e *Executor) executeTask(rt *runningTask) {
if err != nil {
e.log.Err(err).Send()
if rt.et.Spec.Stop {
et.Status.Phase = types.ExecutorTaskPhaseStopped
if et.Spec.StopReason == types.RunStopped {
et.Status.Phase = types.ExecutorTaskPhaseStopped
} else {
et.Status.Phase = types.ExecutorTaskPhaseTimedout
}
} else {
et.Status.Phase = types.ExecutorTaskPhaseFailed
}
Expand Down Expand Up @@ -1159,6 +1163,7 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) {
if rt != nil {
rt.Lock()
// update running task Spec.Stop value only when there's a transitions from false to true,
// update running task Spec.Timeout value only when there's a transitions from false to true,
// other spec values cannot change once the task has been scheduled
if !rt.et.Spec.Stop && et.Spec.Stop {
rt.et.Spec.Stop = et.Spec.Stop
Expand Down Expand Up @@ -1488,6 +1493,7 @@ func (e *Executor) Run(ctx context.Context) error {
go e.podsCleanerLoop(ctx)
go e.tasksUpdaterLoop(ctx)
go e.tasksDataCleanerLoop(ctx)
go e.tasksTimeoutCleanerLoop(ctx)

go e.handleTasks(ctx, ch)

Expand Down Expand Up @@ -1517,3 +1523,51 @@ func (e *Executor) Run(ctx context.Context) error {

return nil
}

func (e *Executor) tasksTimeoutCleanerLoop(ctx context.Context) {
for {
e.log.Debug().Msgf("tasksTimeoutCleaner")

e.tasksTimeoutCleaner(ctx)

sleepCh := time.NewTimer(2 * time.Second).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}

func (e *Executor) tasksTimeoutCleaner(ctx context.Context) {
for _, rtID := range e.runningTasks.ids() {
rt, ok := e.runningTasks.get(rtID)
if !ok {
continue
}

rt.Lock()
if rt.et.Status.Phase == types.ExecutorTaskPhaseRunning && rt.et.Spec.TaskTimeoutInterval != 0 && time.Since(*rt.et.Status.StartTime) > rt.et.Spec.TaskTimeoutInterval {
rt.et.Status.Phase = types.ExecutorTaskPhaseTimedout
rt.et.Status.EndTime = util.TimeP(time.Now())

if rt.et.Status.SetupStep.Phase == types.ExecutorTaskPhaseRunning {
rt.et.Status.SetupStep.Phase = types.ExecutorTaskPhaseFailed
rt.et.Status.SetupStep.EndTime = util.TimeP(time.Now())
}
for _, s := range rt.et.Status.Steps {
if s.Phase == types.ExecutorTaskPhaseRunning {
s.Phase = types.ExecutorTaskPhaseFailed
s.EndTime = util.TimeP(time.Now())
}
}

if err := e.sendExecutorTaskStatus(ctx, rt.et); err != nil {
e.log.Err(err).Send()
}

rt.cancel()
}
rt.Unlock()
}
}
4 changes: 4 additions & 0 deletions internal/services/gateway/api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.Run

Level: rct.Level,
Depends: rct.Depends,

TaskTimeoutInterval: rct.TaskTimeoutInterval,
}

return t
Expand All @@ -96,6 +98,8 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwa

StartTime: rt.StartTime,
EndTime: rt.EndTime,

TaskTimeoutInterval: rct.TaskTimeoutInterval,
}

t.SetupStep = &gwapitypes.RunTaskResponseSetupStep{
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func GenExecutorTaskSpecData(r *types.Run, rt *types.RunTask, rc *types.RunConfi
Steps: rct.Steps,
CachePrefix: cachePrefix,
DockerRegistriesAuth: rct.DockerRegistriesAuth,
TaskTimeoutInterval: rct.TaskTimeoutInterval,
}

// calculate workspace operations
Expand Down
Loading

0 comments on commit 0810247

Please sign in to comment.