From a6a8d61884ff9e55c570b30b8a922af25b7b2c33 Mon Sep 17 00:00:00 2001 From: "alessandro.pinna" Date: Mon, 14 Mar 2022 01:52:09 -0700 Subject: [PATCH] runConfig: added task timeout in global, run and task configuration. executor: added tasksTimeoutCleaner for managing task timeout. scheduler: updated executorTaskCleaner for managing task timeout and report it to the executor --- internal/config/config.go | 4 + internal/migration/runservice/types/types.go | 3 +- internal/runconfig/runconfig.go | 14 ++ internal/runconfig/runconfig_test.go | 157 ++++++++++++++++++ internal/services/executor/executor.go | 65 +++++++- internal/services/gateway/api/run.go | 4 + internal/services/runservice/common/common.go | 1 + internal/services/runservice/scheduler.go | 37 ++++- services/gateway/api/types/run.go | 4 + services/runservice/types/executortask.go | 7 +- services/runservice/types/run.go | 5 +- services/runservice/types/runconfig.go | 2 + tests/setup_test.go | 94 +++++++++++ 13 files changed, 390 insertions(+), 7 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index a815db979..a1605c79e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,6 +19,7 @@ import ( "fmt" "regexp" "strings" + "time" "agola.io/agola/internal/errors" itypes "agola.io/agola/internal/services/types" @@ -55,6 +56,7 @@ type Config struct { Runs []*Run `json:"runs"` DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"` + TaskTimeout *time.Duration `json:"task_timeout"` } type RuntimeType string @@ -113,6 +115,7 @@ type Run struct { Tasks []*Task `json:"tasks"` When *When `json:"when"` DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"` + TaskTimeout *time.Duration `json:"task_timeout"` } type Task struct { @@ -128,6 +131,7 @@ type Task struct { Approval bool `json:"approval"` When *When `json:"when"` DockerRegistriesAuth map[string]*DockerRegistryAuth `json:"docker_registries_auth"` + TaskTimeout *time.Duration `json:"task_timeout"` } type DependCondition string diff --git a/internal/migration/runservice/types/types.go b/internal/migration/runservice/types/types.go index 2b7ebd2fb..234cc2678 100644 --- a/internal/migration/runservice/types/types.go +++ b/internal/migration/runservice/types/types.go @@ -206,10 +206,11 @@ const ( RunTaskStatusStopped RunTaskStatus = "stopped" RunTaskStatusSuccess RunTaskStatus = "success" RunTaskStatusFailed RunTaskStatus = "failed" + RunTaskStatusTimeout RunTaskStatus = "timeout" ) func (s RunTaskStatus) IsFinished() bool { - return s == RunTaskStatusCancelled || s == RunTaskStatusSkipped || s == RunTaskStatusStopped || s == RunTaskStatusSuccess || s == RunTaskStatusFailed + return s == RunTaskStatusCancelled || s == RunTaskStatusSkipped || s == RunTaskStatusStopped || s == RunTaskStatusSuccess || s == RunTaskStatusFailed || s == RunTaskStatusTimeout } type RunTaskFetchPhase string diff --git a/internal/runconfig/runconfig.go b/internal/runconfig/runconfig.go index 9ff2512c3..aa7f80f5d 100644 --- a/internal/runconfig/runconfig.go +++ b/internal/runconfig/runconfig.go @@ -270,6 +270,20 @@ func GenRunConfigTasks(uuid util.UUIDGenerator, c *config.Config, runName string } } + if c.TaskTimeout != nil { + t.TaskTimeout = *c.TaskTimeout + } + + // override with per run task timeout + if cr.TaskTimeout != nil { + t.TaskTimeout = *cr.TaskTimeout + } + + // override with per task timeout + if ct.TaskTimeout != nil { + t.TaskTimeout = *ct.TaskTimeout + } + rcts[t.ID] = t } diff --git a/internal/runconfig/runconfig_test.go b/internal/runconfig/runconfig_test.go index 4cde8ce62..4e41058a8 100644 --- a/internal/runconfig/runconfig_test.go +++ b/internal/runconfig/runconfig_test.go @@ -18,6 +18,7 @@ import ( "fmt" "reflect" "testing" + "time" "agola.io/agola/internal/config" "agola.io/agola/internal/errors" @@ -1358,6 +1359,162 @@ func TestGenRunConfig(t *testing.T) { }, }, }, + { + name: "test runconfig generation task timeout global", + in: &config.Config{ + Runs: []*config.Run{ + &config.Run{ + Name: "run01", + Tasks: []*config.Task{ + &config.Task{ + Name: "task01", + Runtime: &config.Runtime{ + Type: "pod", + Arch: "", + Containers: []*config.Container{ + &config.Container{ + Image: "image01", + }, + }, + }, + + Depends: []*config.Depend{}, + WorkingDir: "", + Shell: "", + User: "", + }, + }, + }, + }, + TaskTimeout: util.DurationP(10 * time.Second), + }, + out: map[string]*rstypes.RunConfigTask{ + uuid.New("task01").String(): &rstypes.RunConfigTask{ + ID: uuid.New("task01").String(), + Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{}, + DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{}, + TaskTimeout: 10 * time.Second, + Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"), + Containers: []*rstypes.Container{ + { + Image: "image01", + Volumes: []rstypes.Volume{}, + Environment: map[string]string{}, + }, + }, + }, + Environment: map[string]string{}, + Skip: false, + Steps: rstypes.Steps{}, + Shell: "/bin/sh -e", + }, + }, + }, + { + name: "test global task timeout override by run", + in: &config.Config{ + Runs: []*config.Run{ + &config.Run{ + Name: "run01", + Tasks: []*config.Task{ + &config.Task{ + Name: "task01", + Runtime: &config.Runtime{ + Type: "pod", + Arch: "", + Containers: []*config.Container{ + &config.Container{ + Image: "image01", + }, + }, + }, + + Depends: []*config.Depend{}, + WorkingDir: "", + Shell: "", + User: "", + }, + }, + TaskTimeout: util.DurationP(15 * time.Second), + }, + }, + TaskTimeout: util.DurationP(10 * time.Second), + }, + out: map[string]*rstypes.RunConfigTask{ + uuid.New("task01").String(): &rstypes.RunConfigTask{ + ID: uuid.New("task01").String(), + Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{}, + DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{}, + TaskTimeout: 15 * time.Second, + Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"), + Containers: []*rstypes.Container{ + { + Image: "image01", + Volumes: []rstypes.Volume{}, + Environment: map[string]string{}, + }, + }, + }, + Environment: map[string]string{}, + Skip: false, + Steps: rstypes.Steps{}, + Shell: "/bin/sh -e", + }, + }, + }, + { + name: "test global task timeout override by task", + in: &config.Config{ + Runs: []*config.Run{ + &config.Run{ + Name: "run01", + Tasks: []*config.Task{ + &config.Task{ + Name: "task01", + TaskTimeout: util.DurationP(20 * time.Second), + Runtime: &config.Runtime{ + Type: "pod", + Arch: "", + Containers: []*config.Container{ + &config.Container{ + Image: "image01", + }, + }, + }, + + Depends: []*config.Depend{}, + WorkingDir: "", + Shell: "", + User: "", + }, + }, + TaskTimeout: util.DurationP(15 * time.Second), + }, + }, + TaskTimeout: util.DurationP(10 * time.Second), + }, + out: map[string]*rstypes.RunConfigTask{ + uuid.New("task01").String(): &rstypes.RunConfigTask{ + ID: uuid.New("task01").String(), + Name: "task01", Depends: map[string]*rstypes.RunConfigTaskDepend{}, + DockerRegistriesAuth: map[string]rstypes.DockerRegistryAuth{}, + TaskTimeout: 20 * time.Second, + Runtime: &rstypes.Runtime{Type: rstypes.RuntimeType("pod"), + Containers: []*rstypes.Container{ + { + Image: "image01", + Volumes: []rstypes.Volume{}, + Environment: map[string]string{}, + }, + }, + }, + Environment: map[string]string{}, + Skip: false, + Steps: rstypes.Steps{}, + Shell: "/bin/sh -e", + }, + }, + }, } for _, tt := range tests { diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index 885390c31..308eb42f4 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -784,7 +784,9 @@ func (e *Executor) executeTask(rt *runningTask) { rt.Lock() if err != nil { e.log.Err(err).Send() - if rt.et.Spec.Stop { + if rt.et.Spec.Timeout { + et.Status.Phase = types.ExecutorTaskPhaseTimeout + } else if rt.et.Spec.Stop { et.Status.Phase = types.ExecutorTaskPhaseStopped } else { et.Status.Phase = types.ExecutorTaskPhaseFailed @@ -956,14 +958,14 @@ func (e *Executor) executeTaskSteps(ctx context.Context, rt *runningTask, pod dr rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseSuccess if err != nil { - if rt.et.Spec.Stop { + if rt.et.Spec.Stop || rt.et.Spec.Timeout { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseStopped } else { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed } serr = errors.Wrapf(err, "failed to execute step %s", util.Dump(step)) } else if exitCode != 0 { - if rt.et.Spec.Stop { + if rt.et.Spec.Stop || rt.et.Spec.Timeout { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseStopped } else { rt.et.Status.Steps[i].Phase = types.ExecutorTaskPhaseFailed @@ -1156,10 +1158,21 @@ func (e *Executor) taskUpdater(ctx context.Context, et *types.ExecutorTask) { if rt != nil { rt.Lock() // update running task Spec.Stop value only when there's a transitions from false to true, + // update running task Spec.Timeout value only when there's a transitions from false to true, // other spec values cannot change once the task has been scheduled if !rt.et.Spec.Stop && et.Spec.Stop { rt.et.Spec.Stop = et.Spec.Stop + // cancel the running task + rt.cancel() + } else if !rt.et.Spec.Timeout && et.Spec.Timeout { + rt.et.Spec.Timeout = et.Spec.Timeout + rt.et.Status.Phase = types.ExecutorTaskPhaseTimeout + if et.Status.EndTime != nil { + rt.et.Status.EndTime = et.Status.EndTime + } else { + rt.et.Status.EndTime = util.TimeP(time.Now()) + } // cancel the running task rt.cancel() } @@ -1481,6 +1494,7 @@ func (e *Executor) Run(ctx context.Context) error { go e.podsCleanerLoop(ctx) go e.tasksUpdaterLoop(ctx) go e.tasksDataCleanerLoop(ctx) + go e.tasksTimeoutCleanerLoop(ctx) go e.handleTasks(ctx, ch) @@ -1510,3 +1524,48 @@ func (e *Executor) Run(ctx context.Context) error { return nil } + +func (e *Executor) tasksTimeoutCleanerLoop(ctx context.Context) { + for { + e.log.Debug().Msgf("tasksTimeoutCleaner") + + e.tasksTimeoutCleaner(ctx) + + sleepCh := time.NewTimer(2 * time.Second).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +func (e *Executor) tasksTimeoutCleaner(ctx context.Context) { + for _, rtID := range e.runningTasks.ids() { + if rt, ok := e.runningTasks.get(rtID); ok { + if rt.et.Status.Phase == types.ExecutorTaskPhaseRunning && rt.et.Spec.TaskTimeout != 0 && rt.et.Status.StartTime.Add(rt.et.Spec.TaskTimeout).Before(time.Now()) { + rt.et.Status.Phase = types.ExecutorTaskPhaseTimeout + rt.et.Status.EndTime = util.TimeP(time.Now()) + rt.et.Status.FailError = "timeout" + + if rt.et.Status.SetupStep.Phase == types.ExecutorTaskPhaseRunning { + rt.et.Status.SetupStep.Phase = types.ExecutorTaskPhaseFailed + rt.et.Status.SetupStep.EndTime = util.TimeP(time.Now()) + } + for _, s := range rt.et.Status.Steps { + if s.Phase == types.ExecutorTaskPhaseRunning { + s.Phase = types.ExecutorTaskPhaseFailed + s.EndTime = util.TimeP(time.Now()) + } + } + + if err := e.sendExecutorTaskStatus(ctx, rt.et); err != nil { + e.log.Err(err).Send() + } + + rt.cancel() + e.runningTasks.delete(rtID) + } + } + } +} diff --git a/internal/services/gateway/api/run.go b/internal/services/gateway/api/run.go index 583b1f5f1..165b7798a 100644 --- a/internal/services/gateway/api/run.go +++ b/internal/services/gateway/api/run.go @@ -76,6 +76,8 @@ func createRunResponseTask(r *rstypes.Run, rt *rstypes.RunTask, rct *rstypes.Run Level: rct.Level, Depends: rct.Depends, + + TaskTimeout: rct.TaskTimeout, } return t @@ -95,6 +97,8 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *gwa StartTime: rt.StartTime, EndTime: rt.EndTime, + + TaskTimeout: rct.TaskTimeout, } t.SetupStep = &gwapitypes.RunTaskResponseSetupStep{ diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index 2bfc24df8..ae56a709c 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -143,6 +143,7 @@ func GenExecutorTaskSpecData(r *types.Run, rt *types.RunTask, rc *types.RunConfi Steps: rct.Steps, CachePrefix: cachePrefix, DockerRegistriesAuth: rct.DockerRegistriesAuth, + TaskTimeout: rct.TaskTimeout, } // calculate workspace operations diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 65ace5b20..09d27fa2c 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -624,7 +624,7 @@ func advanceRun(log zerolog.Logger, r *types.Run, rc *types.RunConfig, scheduled if !ok { return errors.Errorf("no such run config task with id %s for run config %s", rt.ID, rc.ID) } - if rt.Status == types.RunTaskStatusFailed { + if rt.Status == types.RunTaskStatusFailed || rt.Status == types.RunTaskStatusTimeout { if !rct.IgnoreFailure { log.Debug().Msgf("marking run %q as failed is task %q is failed", r.ID, rt.ID) r.Result = types.RunResultFailed @@ -774,6 +774,12 @@ func (s *Runservice) updateRunTaskStatus(et *types.ExecutorTask, r *types.Run) e rt.Status != types.RunTaskStatusRunning { wrongstatus = true } + case types.ExecutorTaskPhaseTimeout: + if rt.Status != types.RunTaskStatusTimeout && + rt.Status != types.RunTaskStatusNotStarted && + rt.Status != types.RunTaskStatusRunning { + wrongstatus = true + } } if wrongstatus { s.log.Warn().Msgf("ignoring wrong executor task %q status: %q, rt status: %q", et.ID, et.Status.Phase, rt.Status) @@ -793,6 +799,8 @@ func (s *Runservice) updateRunTaskStatus(et *types.ExecutorTask, r *types.Run) e rt.Status = types.RunTaskStatusSuccess case types.ExecutorTaskPhaseFailed: rt.Status = types.RunTaskStatusFailed + case types.ExecutorTaskPhaseTimeout: + rt.Status = types.RunTaskStatusTimeout } rt.SetupStep.Phase = et.Status.SetupStep.Phase @@ -911,6 +919,33 @@ func (s *Runservice) executorTaskCleaner(ctx context.Context, executorTaskID str } if !et.Status.Phase.IsFinished() { + if et.Status.Phase == types.ExecutorTaskPhaseRunning { + // generate ExecutorTaskSpecData + r, err := s.d.GetRun(tx, et.Spec.RunID) + if err != nil { + return errors.WithStack(err) + } + rt, ok := r.Tasks[et.Spec.RunTaskID] + if !ok { + return errors.Errorf("no such run task with id %s for run %s", et.Spec.RunTaskID, r.ID) + } + rc, err := s.d.GetRunConfig(tx, r.RunConfigID) + if err != nil { + return errors.WithStack(err) + } + et.Spec.ExecutorTaskSpecData = common.GenExecutorTaskSpecData(r, rt, rc) + + if et.Spec.TaskTimeout != 0 && et.Status.StartTime.Add(et.Spec.TaskTimeout).Before(time.Now()) { + s.log.Warn().Msgf("task with id %q timeout", et.ID) + + et.Spec.Timeout = true + if err := s.d.UpdateExecutorTask(tx, et); err != nil { + return errors.WithStack(err) + } + shouldSend = true + } + } + // if the executor doesn't exists anymore mark the not finished executor tasks as failed executor, err := s.d.GetExecutorByExecutorID(tx, et.Spec.ExecutorID) if err != nil { diff --git a/services/gateway/api/types/run.go b/services/gateway/api/types/run.go index 4c9a8bfa8..3a7646504 100644 --- a/services/gateway/api/types/run.go +++ b/services/gateway/api/types/run.go @@ -73,6 +73,8 @@ type RunResponseTask struct { StartTime *time.Time `json:"start_time"` EndTime *time.Time `json:"end_time"` + + TaskTimeout time.Duration `json:"task_timeout"` } type RunTaskResponse struct { @@ -89,6 +91,8 @@ type RunTaskResponse struct { StartTime *time.Time `json:"start_time"` EndTime *time.Time `json:"end_time"` + + TaskTimeout time.Duration `json:"task_timeout"` } type RunTaskResponseSetupStep struct { diff --git a/services/runservice/types/executortask.go b/services/runservice/types/executortask.go index 87abc3e8c..0b1d904cd 100644 --- a/services/runservice/types/executortask.go +++ b/services/runservice/types/executortask.go @@ -23,10 +23,11 @@ const ( ExecutorTaskPhaseStopped ExecutorTaskPhase = "stopped" ExecutorTaskPhaseSuccess ExecutorTaskPhase = "success" ExecutorTaskPhaseFailed ExecutorTaskPhase = "failed" + ExecutorTaskPhaseTimeout ExecutorTaskPhase = "timeout" ) func (s ExecutorTaskPhase) IsFinished() bool { - return s == ExecutorTaskPhaseCancelled || s == ExecutorTaskPhaseStopped || s == ExecutorTaskPhaseSuccess || s == ExecutorTaskPhaseFailed + return s == ExecutorTaskPhaseCancelled || s == ExecutorTaskPhaseStopped || s == ExecutorTaskPhaseSuccess || s == ExecutorTaskPhaseFailed || s == ExecutorTaskPhaseTimeout } type ExecutorTask struct { @@ -53,6 +54,8 @@ type ExecutorTaskSpec struct { // Stop is used to signal from the scheduler when the task must be stopped Stop bool `json:"stop,omitempty"` + // Timeout is used to signal from the scheduler when the task is on timeout + Timeout bool `json:"timeout,omitempty"` *ExecutorTaskSpecData } @@ -79,6 +82,8 @@ type ExecutorTaskSpecData struct { CachePrefix string `json:"cache_prefix,omitempty"` Steps Steps `json:"steps,omitempty"` + + TaskTimeout time.Duration `json:"task_timeout"` } type ExecutorTaskStatus struct { diff --git a/services/runservice/types/run.go b/services/runservice/types/run.go index 2e6802341..ce95473c2 100644 --- a/services/runservice/types/run.go +++ b/services/runservice/types/run.go @@ -178,10 +178,11 @@ const ( RunTaskStatusStopped RunTaskStatus = "stopped" RunTaskStatusSuccess RunTaskStatus = "success" RunTaskStatusFailed RunTaskStatus = "failed" + RunTaskStatusTimeout RunTaskStatus = "timeout" ) func (s RunTaskStatus) IsFinished() bool { - return s == RunTaskStatusCancelled || s == RunTaskStatusSkipped || s == RunTaskStatusStopped || s == RunTaskStatusSuccess || s == RunTaskStatusFailed + return s == RunTaskStatusCancelled || s == RunTaskStatusSkipped || s == RunTaskStatusStopped || s == RunTaskStatusSuccess || s == RunTaskStatusFailed || s == RunTaskStatusTimeout } type RunTaskFetchPhase string @@ -220,6 +221,8 @@ type RunTask struct { StartTime *time.Time `json:"start_time,omitempty"` EndTime *time.Time `json:"end_time,omitempty"` + + TaskTimeout *time.Duration `json:"task_timeout"` } func (rt *RunTask) LogsFetchFinished() bool { diff --git a/services/runservice/types/runconfig.go b/services/runservice/types/runconfig.go index 33e2db83b..4986df794 100644 --- a/services/runservice/types/runconfig.go +++ b/services/runservice/types/runconfig.go @@ -2,6 +2,7 @@ package types import ( "encoding/json" + "time" "agola.io/agola/internal/errors" stypes "agola.io/agola/services/types" @@ -80,6 +81,7 @@ type RunConfigTask struct { NeedsApproval bool `json:"needs_approval,omitempty"` Skip bool `json:"skip,omitempty"` DockerRegistriesAuth map[string]DockerRegistryAuth `json:"docker_registries_auth"` + TaskTimeout time.Duration `json:"task_timeout"` } func (rct *RunConfigTask) DeepCopy() *RunConfigTask { diff --git a/tests/setup_test.go b/tests/setup_test.go index 947b7c314..012305dc0 100644 --- a/tests/setup_test.go +++ b/tests/setup_test.go @@ -41,6 +41,7 @@ import ( "agola.io/agola/internal/util" gwapitypes "agola.io/agola/services/gateway/api/types" gwclient "agola.io/agola/services/gateway/client" + "agola.io/agola/services/runservice/types" rstypes "agola.io/agola/services/runservice/types" "code.gitea.io/sdk/gitea" @@ -1928,3 +1929,96 @@ func TestUserOrgs(t *testing.T) { t.Fatalf("user orgs mismatch (-want +got):\n%s", diff) } } + +func TestTaskTimeout(t *testing.T) { + config := ` + { + runs: [ + { + name: 'run01', + tasks: [ + { + name: 'task01', + runtime: { + containers: [ + { + image: 'alpine/git', + }, + ], + }, + task_timeout: 15000000000, + steps: [ + { type: 'run', command: 'sleep 60' }, + ], + }, + ], + }, + ], + } + ` + + dir := t.TempDir() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, c := setup(ctx, t, dir, false) + + gwClient := gwclient.NewClient(c.Gateway.APIExposedURL, "admintoken") + user, _, err := gwClient.CreateUser(ctx, &gwapitypes.CreateUserRequest{UserName: agolaUser01}) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + t.Logf("created agola user: %s", user.UserName) + + token := createAgolaUserToken(ctx, t, c) + + // From now use the user token + gwClient = gwclient.NewClient(c.Gateway.APIExposedURL, token) + + directRun(t, dir, config, ConfigFormatJsonnet, c.Gateway.APIExposedURL, token) + + time.Sleep(30 * time.Second) + + _ = testutil.Wait(30*time.Second, func() (bool, error) { + run, _, err := gwClient.GetUserRun(ctx, user.ID, 1) + if err != nil { + return false, nil + } + + if run == nil { + return false, nil + } + + if run.Phase != rstypes.RunPhaseFinished { + return false, nil + } + + return true, nil + }) + + run, _, err := gwClient.GetUserRun(ctx, user.ID, 1) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + t.Logf("runs: %s", util.Dump(run)) + + if run == nil { + t.Fatalf("user run not found") + } + if run.Phase != rstypes.RunPhaseFinished { + t.Fatalf("expected run finished got: %s", run.Phase) + } + if run.Result != rstypes.RunResultFailed { + t.Fatalf("expected run failed") + } + if len(run.Tasks) != 1 { + t.Fatalf("expected 1 task got: %d", len(run.Tasks)) + } + for _, task := range run.Tasks { + if task.Status != types.RunTaskStatusTimeout { + t.Fatalf("expected task status timeout got: %s", task.Status) + } + } +}