Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runservice: add tasks timeout #331

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Config struct {
Runs []*Run `json:"runs"`

DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"`
}

type RuntimeType string
Expand Down Expand Up @@ -113,6 +114,7 @@ type Run struct {
Tasks []*Task `json:"tasks"`
When *When `json:"when"`
DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"`
}

type Task struct {
Expand All @@ -128,6 +130,7 @@ type Task struct {
Approval bool `json:"approval"`
When *When `json:"when"`
DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"`
TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"`
}

type DependCondition string
Expand Down
14 changes: 14 additions & 0 deletions internal/runconfig/runconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,20 @@ func GenRunConfigTasks(uuid util.UUIDGenerator, c *config.Config, runName string
}
}

if c.TaskTimeoutInterval != nil {
t.TaskTimeoutInterval = c.TaskTimeoutInterval.Duration
}

// override with per run task timeout
if cr.TaskTimeoutInterval != nil {
t.TaskTimeoutInterval = cr.TaskTimeoutInterval.Duration
}

// override with per task timeout
if ct.TaskTimeoutInterval != nil {
t.TaskTimeoutInterval = ct.TaskTimeoutInterval.Duration
}

rcts[t.ID] = t
}

Expand Down
157 changes: 157 additions & 0 deletions internal/runconfig/runconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"reflect"
"testing"
"time"

"agola.io/agola/internal/config"
"agola.io/agola/internal/errors"
Expand Down Expand Up @@ -1358,6 +1359,162 @@ func TestGenRunConfig(t *testing.T) {
},
},
},
{
name: "test runconfig generation task timeout global",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
},
},
TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second},
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeoutInterval: 10 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
{
name: "test global task timeout override by run",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
TaskTimeoutInterval: &types.Duration{Duration: 15 * time.Second},
},
},
TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second},
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeoutInterval: 15 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
{
name: "test global task timeout override by task",
in: &config.Config{
Runs: []*config.Run{
&config.Run{
Name: "run01",
Tasks: []*config.Task{
&config.Task{
Name: "task01",
TaskTimeoutInterval: &types.Duration{Duration: 20 * time.Second},
Runtime: &config.Runtime{
Type: "pod",
Arch: "",
Containers: []*config.Container{
&config.Container{
Image: "image01",
},
},
},

Depends: []*config.Depend{},
WorkingDir: "",
Shell: "",
User: "",
},
},
TaskTimeoutInterval: &types.Duration{Duration: 15 * time.Second},
},
},
TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second},
},
out: map[string]*rstypes.RunConfigTask{
uuid.New("task01").String(): &rstypes.RunConfigTask{
ID: uuid.New("task01").String(),
Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{},
DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{},
TaskTimeoutInterval: 20 * time.Second,
Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"),
Containers: []*rstypes.Container{
{
Image: "image01",
Volumes: []rstypes.Volume{},
Environment: map[string]string{},
},
},
},
Environment: map[string]string{},
Skip: false,
Steps: rstypes.Steps{},
Shell: "/bin/sh -e",
},
},
},
}

for _, tt := range tests {
Expand Down
47 changes: 46 additions & 1 deletion internal/services/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ const (

// podCreationTimeout is the maximum time to wait for pod creation.
podCreationTimeout = time.Minute * 5

// tasksTimeoutCleanerInterval is the maximum time to wait for tasks timeout cleaner
tasksTimeoutCleanerInterval = time.Second * 2
)

var (
Expand Down Expand Up @@ -777,6 +780,7 @@ func (e *Executor) executeTask(rt *runningTask) {
}

rt.Lock()
rt.podStartTime = util.TimeP(time.Now())
et.Status.SetupStep.Phase = types.ExecutorTaskPhaseSuccess
et.Status.SetupStep.EndTime = util.TimeP(time.Now())
if err := e.sendExecutorTaskStatus(ctx, et); err != nil {
Expand All @@ -790,7 +794,10 @@ func (e *Executor) executeTask(rt *runningTask) {
rt.Lock()
if err != nil {
e.log.Err(err).Send()
if rt.et.Spec.Stop {
if rt.timedout {
et.Status.Phase = types.ExecutorTaskPhaseFailed
et.Status.Timedout = true
} else if rt.et.Spec.Stop {
et.Status.Phase = types.ExecutorTaskPhaseStopped
} else {
et.Status.Phase = types.ExecutorTaskPhaseFailed
Expand Down Expand Up @@ -1296,6 +1303,12 @@ type runningTask struct {

et *types.ExecutorTask
pod driver.Pod

// timedout is used to know when the task is timedout
timedout bool

// podStartTime is used to know when the pod is started
podStartTime *time.Time
}

func (r *runningTasks) get(rtID string) (*runningTask, bool) {
Expand Down Expand Up @@ -1496,6 +1509,7 @@ func (e *Executor) Run(ctx context.Context) error {
go e.podsCleanerLoop(ctx)
go e.tasksUpdaterLoop(ctx)
go e.tasksDataCleanerLoop(ctx)
go e.tasksTimeoutCleanerLoop(ctx)

go e.handleTasks(ctx, ch)

Expand Down Expand Up @@ -1525,3 +1539,34 @@ func (e *Executor) Run(ctx context.Context) error {

return nil
}

func (e *Executor) tasksTimeoutCleanerLoop(ctx context.Context) {
for {
e.log.Debug().Msgf("tasksTimeoutCleaner")

e.tasksTimeoutCleaner()

sleepCh := time.NewTimer(tasksTimeoutCleanerInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}

func (e *Executor) tasksTimeoutCleaner() {
for _, rtID := range e.runningTasks.ids() {
rt, ok := e.runningTasks.get(rtID)
if !ok {
continue
}

rt.Lock()
if rt.et.Spec.TaskTimeoutInterval != 0 && rt.podStartTime != nil && time.Since(*rt.podStartTime) > rt.et.Spec.TaskTimeoutInterval {
rt.timedout = true
rt.cancel()
}
rt.Unlock()
}
}
12 changes: 9 additions & 3 deletions internal/services/gateway/api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ func createRunResponse(r *rstypes.Run, rc *rstypes.RunConfig) *gwapitypes.RunRes

func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwapitypes.RunResponseTask {
t := &gwapitypes.RunResponseTask{
ID: rt.ID,
Name: rct.Name,
Status: rt.Status,
ID: rt.ID,
Name: rct.Name,
Status: rt.Status,
Timedout: rt.Timedout,

StartTime: rt.StartTime,
EndTime: rt.EndTime,
Expand All @@ -76,6 +77,8 @@ func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.Run

Level: rct.Level,
Depends: rct.Depends,

TaskTimeoutInterval: rct.TaskTimeoutInterval,
}

return t
Expand All @@ -86,6 +89,7 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwa
ID: rt.ID,
Name: rct.Name,
Status: rt.Status,
Timedout: rt.Timedout,
Containers: []gwapitypes.RunTaskResponseContainer{},

WaitingApproval: rt.WaitingApproval,
Expand All @@ -96,6 +100,8 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwa

StartTime: rt.StartTime,
EndTime: rt.EndTime,

TaskTimeoutInterval: rct.TaskTimeoutInterval,
}

t.SetupStep = &gwapitypes.RunTaskResponseSetupStep{
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func GenExecutorTaskSpecData(r *types.Run, rt *types.RunTask, rc *types.RunConfi
Steps: rct.Steps,
CachePrefix: cachePrefix,
DockerRegistriesAuth: rct.DockerRegistriesAuth,
TaskTimeoutInterval: rct.TaskTimeoutInterval,
}

// calculate workspace operations
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ func (s *Runservice) updateRunTaskStatus(et *types.ExecutorTask, r *types.Run) e
rt.Status = types.RunTaskStatusFailed
}

rt.Timedout = et.Status.Timedout
rt.SetupStep.Phase = et.Status.SetupStep.Phase
rt.SetupStep.StartTime = et.Status.SetupStep.StartTime
rt.SetupStep.EndTime = et.Status.SetupStep.EndTime
Expand Down
Loading