Skip to content

Commit

Permalink
config/runconfig: add fields to set task timeout at the global, run a…
Browse files Browse the repository at this point in the history
…nd task level

runservice/executor: handle tasks timeout
  • Loading branch information
alessandro-sorint committed Jul 15, 2022
1 parent 20ca985 commit 3e4d935
Show file tree
Hide file tree
Showing 12 changed files with 529 additions and 10 deletions.
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Config struct {
Runs []*Run `json:"runs"`

DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"`
}

type RuntimeType string
Expand Down Expand Up @@ -113,6 +114,7 @@ type Run struct {
Tasks []*Task `json:"tasks"`
When *When `json:"when"`
DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"`
}

type Task struct {
Expand All @@ -128,6 +130,7 @@ type Task struct {
Approval bool `json:"approval"`
When *When `json:"when"`
DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"`
}

type DependCondition string
Expand Down
14 changes: 14 additions & 0 deletions internal/runconfig/runconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,20 @@ func GenRunConfigTasks(uuid util.UUIDGenerator, c *config.Config, runName string
}
}

if c.TaskTimeoutInterval != nil {
t.TaskTimeoutInterval = c.TaskTimeoutInterval.Duration
}

// override with per run task timeout
if cr.TaskTimeoutInterval != nil {
t.TaskTimeoutInterval = cr.TaskTimeoutInterval.Duration
}

// override with per task timeout
if ct.TaskTimeoutInterval != nil {
t.TaskTimeoutInterval = ct.TaskTimeoutInterval.Duration
}

rcts[t.ID] = t
}

Expand Down
157 changes: 157 additions & 0 deletions internal/runconfig/runconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"testing"
"time"

"agola.io/agola/internal/config"
"agola.io/agola/internal/errors"
Expand Down Expand Up @@ -1358,6 +1359,162 @@ func TestGenRunConfig(t *testing.T) {
},
},
},
{
name: "test runconfig generation task timeout global",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
},
},
TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second},
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeoutInterval: 10 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
{
name: "test global task timeout override by run",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
TaskTimeoutInterval: &types.Duration{Duration: 15 * time.Second},
},
},
TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second},
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeoutInterval: 15 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
{
name: "test global task timeout override by task",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
TaskTimeoutInterval: &types.Duration{Duration: 20 * time.Second},
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
TaskTimeoutInterval: &types.Duration{Duration: 15 * time.Second},
},
},
TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second},
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeoutInterval: 20 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
}

for _, tt := range tests {
Expand Down
47 changes: 46 additions & 1 deletion internal/services/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ const (

// podCreationTimeout is the maximum time to wait for pod creation.
podCreationTimeout = time.Minute * 5

// tasksTimeoutCleanerInterval is the maximum time to wait for tasks timeout cleaner
tasksTimeoutCleanerInterval = time.Second * 2
)

var (
Expand Down Expand Up @@ -777,6 +780,7 @@ func (e *Executor) executeTask(rt *runningTask) {
}

rt.Lock()
rt.podStartTime = util.TimeP(time.Now())
et.Status.SetupStep.Phase = types.ExecutorTaskPhaseSuccess
et.Status.SetupStep.EndTime = util.TimeP(time.Now())
if err := e.sendExecutorTaskStatus(ctx, et); err != nil {
Expand All @@ -790,7 +794,10 @@ func (e *Executor) executeTask(rt *runningTask) {
rt.Lock()
if err != nil {
e.log.Err(err).Send()
if rt.et.Spec.Stop {
if rt.timedout {
et.Status.Phase = types.ExecutorTaskPhaseFailed
et.Status.Timedout = true
} else if rt.et.Spec.Stop {
et.Status.Phase = types.ExecutorTaskPhaseStopped
} else {
et.Status.Phase = types.ExecutorTaskPhaseFailed
Expand Down Expand Up @@ -1296,6 +1303,12 @@ type runningTask struct {

et *types.ExecutorTask
pod driver.Pod

// timedout is used to know when the task is timedout
timedout bool

// podStartTime is used to know when the pod is started
podStartTime *time.Time
}

func (r *runningTasks) get(rtID string) (*runningTask, bool) {
Expand Down Expand Up @@ -1496,6 +1509,7 @@ func (e *Executor) Run(ctx context.Context) error {
go e.podsCleanerLoop(ctx)
go e.tasksUpdaterLoop(ctx)
go e.tasksDataCleanerLoop(ctx)
go e.tasksTimeoutCleanerLoop(ctx)

go e.handleTasks(ctx, ch)

Expand Down Expand Up @@ -1525,3 +1539,34 @@ func (e *Executor) Run(ctx context.Context) error {

return nil
}

func (e *Executor) tasksTimeoutCleanerLoop(ctx context.Context) {
for {
e.log.Debug().Msgf("tasksTimeoutCleaner")

e.tasksTimeoutCleaner()

sleepCh := time.NewTimer(tasksTimeoutCleanerInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}

func (e *Executor) tasksTimeoutCleaner() {
for _, rtID := range e.runningTasks.ids() {
rt, ok := e.runningTasks.get(rtID)
if !ok {
continue
}

rt.Lock()
if rt.et.Spec.TaskTimeoutInterval != 0 && rt.podStartTime != nil && time.Since(*rt.podStartTime) > rt.et.Spec.TaskTimeoutInterval {
rt.timedout = true
rt.cancel()
}
rt.Unlock()
}
}
12 changes: 9 additions & 3 deletions internal/services/gateway/api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ func createRunResponse(r *rstypes.Run, rc *rstypes.RunConfig) *gwapitypes.RunRes

func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwapitypes.RunResponseTask {
t := &gwapitypes.RunResponseTask{
ID: rt.ID,
Name: rct.Name,
Status: rt.Status,
ID: rt.ID,
Name: rct.Name,
Status: rt.Status,
Timedout: rt.Timedout,

StartTime: rt.StartTime,
EndTime: rt.EndTime,
Expand All @@ -76,6 +77,8 @@ func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.Run

Level: rct.Level,
Depends: rct.Depends,

TaskTimeoutInterval: rct.TaskTimeoutInterval,
}

return t
Expand All @@ -86,6 +89,7 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwa
ID: rt.ID,
Name: rct.Name,
Status: rt.Status,
Timedout: rt.Timedout,
Containers: []gwapitypes.RunTaskResponseContainer{},

WaitingApproval: rt.WaitingApproval,
Expand All @@ -96,6 +100,8 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwa

StartTime: rt.StartTime,
EndTime: rt.EndTime,

TaskTimeoutInterval: rct.TaskTimeoutInterval,
}

t.SetupStep = &gwapitypes.RunTaskResponseSetupStep{
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func GenExecutorTaskSpecData(r *types.Run, rt *types.RunTask, rc *types.RunConfi
Steps: rct.Steps,
CachePrefix: cachePrefix,
DockerRegistriesAuth: rct.DockerRegistriesAuth,
TaskTimeoutInterval: rct.TaskTimeoutInterval,
}

// calculate workspace operations
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ func (s *Runservice) updateRunTaskStatus(et *types.ExecutorTask, r *types.Run) e
rt.Status = types.RunTaskStatusFailed
}

rt.Timedout = et.Status.Timedout
rt.SetupStep.Phase = et.Status.SetupStep.Phase
rt.SetupStep.StartTime = et.Status.SetupStep.StartTime
rt.SetupStep.EndTime = et.Status.SetupStep.EndTime
Expand Down
Loading

0 comments on commit 3e4d935

Please sign in to comment.