From 04b426b61b51ccb1d677571d1e80fecbc2055ee7 Mon Sep 17 00:00:00 2001 From: "alessandro.pinna" Date: Mon, 14 Mar 2022 01:52:09 -0700 Subject: [PATCH] runservice: add tasks timeout config/runconfig: add fields to set task timeout at the global, run and task level runservice/executor: handle tasks timeout --- internal/config/config.go | 3 + internal/runconfig/runconfig.go | 14 + internal/runconfig/runconfig_test.go | 157 ++++++++++ internal/services/executor/executor.go | 47 ++- internal/services/gateway/api/run.go | 12 +- internal/services/runservice/common/common.go | 1 + internal/services/runservice/scheduler.go | 1 + services/gateway/api/types/run.go | 16 +- services/runservice/types/executortask.go | 5 +- services/runservice/types/run.go | 5 + services/runservice/types/runconfig.go | 2 + tests/setup_test.go | 276 ++++++++++++++++++ 12 files changed, 529 insertions(+), 10 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index a815db979..9e99da0ae 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,6 +55,7 @@ type Config struct { Runs []*Run `json:"runs"` DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"` + TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"` } type RuntimeType string @@ -113,6 +114,7 @@ type Run struct { Tasks []*Task `json:"tasks"` When *When `json:"when"` DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"` + TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"` } type Task struct { @@ -128,6 +130,7 @@ type Task struct { Approval bool `json:"approval"` When *When `json:"when"` DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"` + TaskTimeoutInterval *types.Duration `json:"task_timeout_interval"` } type DependCondition string diff --git a/internal/runconfig/runconfig.go b/internal/runconfig/runconfig.go index 9ff2512c3..faf2aa41e 100644 --- a/internal/runconfig/runconfig.go +++ b/internal/runconfig/runconfig.go @@ -270,6 +270,20 @@ func GenRunConfigTasks(uuid util.UUIDGenerator, c *config.Config, runName string } } + if c.TaskTimeoutInterval != nil { + t.TaskTimeoutInterval = c.TaskTimeoutInterval.Duration + } + + // override with per run task timeout + if cr.TaskTimeoutInterval != nil { + t.TaskTimeoutInterval = cr.TaskTimeoutInterval.Duration + } + + // override with per task timeout + if ct.TaskTimeoutInterval != nil { + t.TaskTimeoutInterval = ct.TaskTimeoutInterval.Duration + } + rcts[t.ID] = t } diff --git a/internal/runconfig/runconfig_test.go b/internal/runconfig/runconfig_test.go index 4cde8ce62..9d51ffc36 100644 --- a/internal/runconfig/runconfig_test.go +++ b/internal/runconfig/runconfig_test.go @@ -18,6 +18,7 @@ import ( "fmt" "reflect" "testing" + "time" "agola.io/agola/internal/config" "agola.io/agola/internal/errors" @@ -1358,6 +1359,162 @@ func TestGenRunConfig(t *testing.T) { }, }, }, + { + name: "test runconfig generation task timeout global", + in: &config.Config{ + Runs: []*config.Run{ + &config.Run{ + Name: "run01", + Tasks: []*config.Task{ + &config.Task{ + Name: "task01", + Runtime: &config.Runtime{ + Type: "pod", + Arch: "", + Containers: []*config.Container{ + &config.Container{ + Image: "image01", + }, + }, + }, + + Depends: []*config.Depend{}, + WorkingDir: "", + Shell: "", + User: "", + }, + }, + }, + }, + TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second}, + }, + out: map[string]*rstypes.RunConfigTask{ + uuid.New("task01").String(): &rstypes.RunConfigTask{ + ID: uuid.New("task01").String(), + Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{}, + DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{}, + TaskTimeoutInterval: 10 * time.Second, + Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"), + Containers: []*rstypes.Container{ + { + Image: "image01", + Volumes: []rstypes.Volume{}, + Environment: map[string]string{}, + }, + }, + }, + Environment: map[string]string{}, + Skip: false, + Steps: rstypes.Steps{}, + Shell: "/bin/sh -e", + }, + }, + }, + { + name: "test global task timeout override by run", + in: &config.Config{ + Runs: []*config.Run{ + &config.Run{ + Name: "run01", + Tasks: []*config.Task{ + &config.Task{ + Name: "task01", + Runtime: &config.Runtime{ + Type: "pod", + Arch: "", + Containers: []*config.Container{ + &config.Container{ + Image: "image01", + }, + }, + }, + + Depends: []*config.Depend{}, + WorkingDir: "", + Shell: "", + User: "", + }, + }, + TaskTimeoutInterval: &types.Duration{Duration: 15 * time.Second}, + }, + }, + TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second}, + }, + out: map[string]*rstypes.RunConfigTask{ + uuid.New("task01").String(): &rstypes.RunConfigTask{ + ID: uuid.New("task01").String(), + Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{}, + DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{}, + TaskTimeoutInterval: 15 * time.Second, + Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"), + Containers: []*rstypes.Container{ + { + Image: "image01", + Volumes: []rstypes.Volume{}, + Environment: map[string]string{}, + }, + }, + }, + Environment: map[string]string{}, + Skip: false, + Steps: rstypes.Steps{}, + Shell: "/bin/sh -e", + }, + }, + }, + { + name: "test global task timeout override by task", + in: &config.Config{ + Runs: []*config.Run{ + &config.Run{ + Name: "run01", + Tasks: []*config.Task{ + &config.Task{ + Name: "task01", + TaskTimeoutInterval: &types.Duration{Duration: 20 * time.Second}, + Runtime: &config.Runtime{ + Type: "pod", + Arch: "", + Containers: []*config.Container{ + &config.Container{ + Image: "image01", + }, + }, + }, + + Depends: []*config.Depend{}, + WorkingDir: "", + Shell: "", + User: "", + }, + }, + TaskTimeoutInterval: &types.Duration{Duration: 15 * time.Second}, + }, + }, + TaskTimeoutInterval: &types.Duration{Duration: 10 * time.Second}, + }, + out: map[string]*rstypes.RunConfigTask{ + uuid.New("task01").String(): &rstypes.RunConfigTask{ + ID: uuid.New("task01").String(), + Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{}, + DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{}, + TaskTimeoutInterval: 20 * time.Second, + Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"), + Containers: []*rstypes.Container{ + { + Image: "image01", + Volumes: []rstypes.Volume{}, + Environment: map[string]string{}, + }, + }, + }, + Environment: map[string]string{}, + Skip: false, + Steps: rstypes.Steps{}, + Shell: "/bin/sh -e", + }, + }, + }, } for _, tt := range tests { diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index 18df2a664..508be336e 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -54,6 +54,9 @@ const ( // podCreationTimeout is the maximum time to wait for pod creation. podCreationTimeout = time.Minute * 5 + + // tasksTimeoutCleanerInterval is the maximum time to wait for tasks timeout cleaner + tasksTimeoutCleanerInterval = time.Second * 2 ) var ( @@ -777,6 +780,7 @@ func (e *Executor) executeTask(rt *runningTask) { } rt.Lock() + rt.podStartTime = util.TimeP(time.Now()) et.Status.SetupStep.Phase = types.ExecutorTaskPhaseSuccess et.Status.SetupStep.EndTime = util.TimeP(time.Now()) if err := e.sendExecutorTaskStatus(ctx, et); err != nil { @@ -790,7 +794,10 @@ func (e *Executor) executeTask(rt *runningTask) { rt.Lock() if err != nil { e.log.Err(err).Send() - if rt.et.Spec.Stop { + if rt.timedout { + et.Status.Phase = types.ExecutorTaskPhaseFailed + et.Status.Timedout = true + } else if rt.et.Spec.Stop { et.Status.Phase = types.ExecutorTaskPhaseStopped } else { et.Status.Phase = types.ExecutorTaskPhaseFailed @@ -1296,6 +1303,12 @@ type runningTask struct { et *types.ExecutorTask pod driver.Pod + + // timedout is used to know when the task is timedout + timedout bool + + // podStartTime is used to know when the pod is started + podStartTime *time.Time } func (r *runningTasks) get(rtID string) (*runningTask, bool) { @@ -1496,6 +1509,7 @@ func (e *Executor) Run(ctx context.Context) error { go e.podsCleanerLoop(ctx) go e.tasksUpdaterLoop(ctx) go e.tasksDataCleanerLoop(ctx) + go e.tasksTimeoutCleanerLoop(ctx) go e.handleTasks(ctx, ch) @@ -1525,3 +1539,34 @@ func (e *Executor) Run(ctx context.Context) error { return nil } + +func (e *Executor) tasksTimeoutCleanerLoop(ctx context.Context) { + for { + e.log.Debug().Msgf("tasksTimeoutCleaner") + + e.tasksTimeoutCleaner() + + sleepCh := time.NewTimer(tasksTimeoutCleanerInterval).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +func (e *Executor) tasksTimeoutCleaner() { + for _, rtID := range e.runningTasks.ids() { + rt, ok := e.runningTasks.get(rtID) + if !ok { + continue + } + + rt.Lock() + if rt.et.Spec.TaskTimeoutInterval != 0 && rt.podStartTime != nil && time.Since(*rt.podStartTime) > rt.et.Spec.TaskTimeoutInterval { + rt.timedout = true + rt.cancel() + } + rt.Unlock() + } +} diff --git a/internal/services/gateway/api/run.go b/internal/services/gateway/api/run.go index a16e946e7..78df74dcd 100644 --- a/internal/services/gateway/api/run.go +++ b/internal/services/gateway/api/run.go @@ -63,9 +63,10 @@ func createRunResponse(r *rstypes.Run, rc *rstypes.RunConfig) *gwapitypes.RunRes func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwapitypes.RunResponseTask { t := &gwapitypes.RunResponseTask{ - ID: rt.ID, - Name: rct.Name, - Status: rt.Status, + ID: rt.ID, + Name: rct.Name, + Status: rt.Status, + Timedout: rt.Timedout, StartTime: rt.StartTime, EndTime: rt.EndTime, @@ -76,6 +77,8 @@ func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.Run Level: rct.Level, Depends: rct.Depends, + + TaskTimeoutInterval: rct.TaskTimeoutInterval, } return t @@ -86,6 +89,7 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwa ID: rt.ID, Name: rct.Name, Status: rt.Status, + Timedout: rt.Timedout, Containers: []gwapitypes.RunTaskResponseContainer{}, WaitingApproval: rt.WaitingApproval, @@ -96,6 +100,8 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwa StartTime: rt.StartTime, EndTime: rt.EndTime, + + TaskTimeoutInterval: rct.TaskTimeoutInterval, } t.SetupStep = &gwapitypes.RunTaskResponseSetupStep{ diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index 1742b46ad..db0b541c3 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -144,6 +144,7 @@ func GenExecutorTaskSpecData(r *types.Run, rt *types.RunTask, rc *types.RunConfi Steps: rct.Steps, CachePrefix: cachePrefix, DockerRegistriesAuth: rct.DockerRegistriesAuth, + TaskTimeoutInterval: rct.TaskTimeoutInterval, } // calculate workspace operations diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 20fe79187..5052c5fff 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -796,6 +796,7 @@ func (s *Runservice) updateRunTaskStatus(et *types.ExecutorTask, r *types.Run) e rt.Status = types.RunTaskStatusFailed } + rt.Timedout = et.Status.Timedout rt.SetupStep.Phase = et.Status.SetupStep.Phase rt.SetupStep.StartTime = et.Status.SetupStep.StartTime rt.SetupStep.EndTime = et.Status.SetupStep.EndTime diff --git a/services/gateway/api/types/run.go b/services/gateway/api/types/run.go index 0cedf640c..84dd98681 100644 --- a/services/gateway/api/types/run.go +++ b/services/gateway/api/types/run.go @@ -61,11 +61,12 @@ type RunResponse struct { } type RunResponseTask struct { - ID string `json:"id"` - Name string `json:"name"` - Status rstypes.RunTaskStatus `json:"status"` - Level int `json:"level"` - Depends map[string]*rstypes.RunConfigTaskDepend `json:"depends"` + ID string `json:"id"` + Name string `json:"name"` + Status rstypes.RunTaskStatus `json:"status"` + Timedout bool `json:"timedout"` + Level int `json:"level"` + Depends map[string]*rstypes.RunConfigTaskDepend `json:"depends"` WaitingApproval bool `json:"waiting_approval"` Approved bool `json:"approved"` @@ -73,12 +74,15 @@ type RunResponseTask struct { StartTime *time.Time `json:"start_time"` EndTime *time.Time `json:"end_time"` + + TaskTimeoutInterval time.Duration `json:"task_timeout_interval"` } type RunTaskResponse struct { ID string `json:"id"` Name string `json:"name"` Status rstypes.RunTaskStatus `json:"status"` + Timedout bool `json:"timedout"` Containers []RunTaskResponseContainer `json:"containers"` WaitingApproval bool `json:"waiting_approval"` @@ -90,6 +94,8 @@ type RunTaskResponse struct { StartTime *time.Time `json:"start_time"` EndTime *time.Time `json:"end_time"` + + TaskTimeoutInterval time.Duration `json:"task_timeout_interval"` } type RunTaskResponseContainer struct { diff --git a/services/runservice/types/executortask.go b/services/runservice/types/executortask.go index 87abc3e8c..478fb924f 100644 --- a/services/runservice/types/executortask.go +++ b/services/runservice/types/executortask.go @@ -79,12 +79,15 @@ type ExecutorTaskSpecData struct { CachePrefix string `json:"cache_prefix,omitempty"` Steps Steps `json:"steps,omitempty"` + + TaskTimeoutInterval time.Duration `json:"task_timeout_interval"` } type ExecutorTaskStatus struct { ID string `json:"id,omitempty"` - Phase ExecutorTaskPhase `json:"phase,omitempty"` + Phase ExecutorTaskPhase `json:"phase,omitempty"` + Timedout bool `json:"timedout,omitempty"` FailError string `json:"fail_error,omitempty"` diff --git a/services/runservice/types/run.go b/services/runservice/types/run.go index 2e6802341..540cb4904 100644 --- a/services/runservice/types/run.go +++ b/services/runservice/types/run.go @@ -201,6 +201,9 @@ type RunTask struct { // there're no executor tasks scheduled Status RunTaskStatus `json:"status,omitempty"` + // Timedout represent if the task has timed out + Timedout bool `json:"timedout,omitempty"` + // Annotations contain custom task annotations // these are opaque to the runservice and used for multiple pourposes. For // example to stores task approval metadata. @@ -220,6 +223,8 @@ type RunTask struct { StartTime *time.Time `json:"start_time,omitempty"` EndTime *time.Time `json:"end_time,omitempty"` + + TaskTimeoutInterval *time.Duration `json:"task_timeout_interval"` } func (rt *RunTask) LogsFetchFinished() bool { diff --git a/services/runservice/types/runconfig.go b/services/runservice/types/runconfig.go index 33e2db83b..93291204b 100644 --- a/services/runservice/types/runconfig.go +++ b/services/runservice/types/runconfig.go @@ -2,6 +2,7 @@ package types import ( "encoding/json" + "time" "agola.io/agola/internal/errors" stypes "agola.io/agola/services/types" @@ -80,6 +81,7 @@ type RunConfigTask struct { NeedsApproval bool `json:"needs_approval,omitempty"` Skip bool `json:"skip,omitempty"` DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"` + TaskTimeoutInterval time.Duration `json:"task_timeout_interval"` } func (rct *RunConfigTask) DeepCopy() *RunConfigTask { diff --git a/tests/setup_test.go b/tests/setup_test.go index 947b7c314..4f29ce9cb 100644 --- a/tests/setup_test.go +++ b/tests/setup_test.go @@ -41,6 +41,7 @@ import ( "agola.io/agola/internal/util" gwapitypes "agola.io/agola/services/gateway/api/types" gwclient "agola.io/agola/services/gateway/client" + "agola.io/agola/services/runservice/types" rstypes "agola.io/agola/services/runservice/types" "code.gitea.io/sdk/gitea" @@ -1928,3 +1929,278 @@ func TestUserOrgs(t *testing.T) { t.Fatalf("user orgs mismatch (-want +got):\n%s", diff) } } + +func TestTaskTimeout(t *testing.T) { + tests := []struct { + name string + config string + tasksResultExpected map[string]rstypes.RunTaskStatus + taskTimedoutExpected map[string]bool + }{ + { + name: "test timeout string value", + tasksResultExpected: map[string]rstypes.RunTaskStatus{"task01": types.RunTaskStatusFailed}, + taskTimedoutExpected: map[string]bool{"task01": true}, + config: ` + { + runs: [ + { + name: 'run01', + tasks: [ + { + name: 'task01', + runtime: { + containers: [ + { + image: 'alpine/git', + }, + ], + }, + task_timeout_interval: "15s", + steps: [ + { type: 'run', command: 'sleep 30' }, + ], + }, + ], + }, + ], + } + `, + }, + { + name: "test timeout int value", + tasksResultExpected: map[string]rstypes.RunTaskStatus{"task01": types.RunTaskStatusFailed}, + taskTimedoutExpected: map[string]bool{"task01": true}, + config: ` + { + runs: [ + { + name: 'run01', + tasks: [ + { + name: 'task01', + runtime: { + containers: [ + { + image: 'alpine/git', + }, + ], + }, + task_timeout_interval: 15000000000, + steps: [ + { type: 'run', command: 'sleep 30' }, + ], + }, + ], + }, + ], + } + `, + }, + { + name: "test timeout child timeout", + tasksResultExpected: map[string]rstypes.RunTaskStatus{"task01": types.RunTaskStatusSuccess, "task02": types.RunTaskStatusFailed}, + taskTimedoutExpected: map[string]bool{"task01": false, "task02": true}, + config: ` + { + runs: [ + { + name: 'run01', + tasks: [ + { + name: 'task01', + runtime: { + containers: [ + { + image: 'alpine/git', + }, + ], + }, + steps: [ + { type: 'run', command: 'sleep 30' }, + ], + }, + { + name: 'task02', + depends: ['task01'], + runtime: { + containers: [ + { + image: 'alpine/git', + }, + ], + }, + task_timeout_interval: "15s", + steps: [ + { type: 'run', command: 'sleep 30' }, + ], + }, + ], + }, + ], + } + `, + }, + { + name: "test timeout parent timeout", + tasksResultExpected: map[string]rstypes.RunTaskStatus{"task01": types.RunTaskStatusFailed, "task02": types.RunTaskStatusSkipped}, + taskTimedoutExpected: map[string]bool{"task01": true, "task02": false}, + config: ` + { + runs: [ + { + name: 'run01', + tasks: [ + { + name: 'task01', + runtime: { + containers: [ + { + image: 'alpine/git', + }, + ], + }, + task_timeout_interval: "15s", + steps: [ + { type: 'run', command: 'sleep 30' }, + ], + }, + { + name: 'task02', + depends: ['task01'], + runtime: { + containers: [ + { + image: 'alpine/git', + }, + ], + }, + steps: [ + { type: 'run', command: 'sleep 30' }, + ], + }, + ], + }, + ], + } + `, + }, + { + name: "test timeout parent and child timeout", + tasksResultExpected: map[string]rstypes.RunTaskStatus{"task01": types.RunTaskStatusFailed, "task02": types.RunTaskStatusSkipped}, + taskTimedoutExpected: map[string]bool{"task01": true, "task02": false}, + config: ` + { + runs: [ + { + name: 'run01', + tasks: [ + { + name: 'task01', + runtime: { + containers: [ + { + image: 'alpine/git', + }, + ], + }, + task_timeout_interval: "15s", + steps: [ + { type: 'run', command: 'sleep 30' }, + ], + }, + { + name: 'task02', + depends: ['task01'], + runtime: { + containers: [ + { + image: 'alpine/git', + }, + ], + }, + task_timeout_interval: "15s", + steps: [ + { type: 'run', command: 'sleep 30' }, + ], + }, + ], + }, + ], + } + `, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dir := t.TempDir() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, c := setup(ctx, t, dir, false) + + gwClient := gwclient.NewClient(c.Gateway.APIExposedURL, "admintoken") + user, _, err := gwClient.CreateUser(ctx, &gwapitypes.CreateUserRequest{UserName: agolaUser01}) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + t.Logf("created agola user: %s", user.UserName) + + token := createAgolaUserToken(ctx, t, c) + + // From now use the user token + gwClient = gwclient.NewClient(c.Gateway.APIExposedURL, token) + + directRun(t, dir, tt.config, ConfigFormatJsonnet, c.Gateway.APIExposedURL, token) + + time.Sleep(30 * time.Second) + + _ = testutil.Wait(120*time.Second, func() (bool, error) { + run, _, err := gwClient.GetUserRun(ctx, user.ID, 1) + if err != nil { + return false, nil + } + + if run == nil { + return false, nil + } + + if run.Phase != rstypes.RunPhaseFinished { + return false, nil + } + + return true, nil + }) + + run, _, err := gwClient.GetUserRun(ctx, user.ID, 1) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + t.Logf("runs: %s", util.Dump(run)) + + if run == nil { + t.Fatalf("user run not found") + } + if run.Phase != rstypes.RunPhaseFinished { + t.Fatalf("expected run finished got: %s", run.Phase) + } + if run.Result != rstypes.RunResultFailed { + t.Fatalf("expected run failed") + } + if len(run.Tasks) != len(tt.tasksResultExpected) { + t.Fatalf("expected 1 task got: %d", len(run.Tasks)) + } + for _, task := range run.Tasks { + if task.Status != tt.tasksResultExpected[task.Name] { + t.Fatalf("expected task status %s got: %s", tt.tasksResultExpected[task.Name], task.Status) + } + if task.Timedout != tt.taskTimedoutExpected[task.Name] { + t.Fatalf("expected task timedout %v got: %v", tt.taskTimedoutExpected[task.Name], task.Timedout) + } + } + }) + } +}