Skip to content

Commit

Permalink
Fix bug where PipelineRun hangs after task failure
Browse files Browse the repository at this point in the history
Previously, the function GetSchedulableTasks was called only if a PipelineRun
was in a running state, and it would return an error if called when a PipelineTask
had failed. A different change resulted in this function being called when the
PipelineRun was in a stopping state due to TaskRun failure, meaning this function
returned an error and the PipelineRun failed to be reconciled.

This commit renames the function GetSchedulableTasks to GetCandidateTasks, indicating
that it returns any tasks with completed ancestors regardless of the state of PipelineRun
execution. It is the resposibility of call sites to determine which of these candidate tasks
are schedulable. It also updates the function DAGExecutionQueue to pass a list of all
completed Tasks (not just successful or skipped ones) to this function.
  • Loading branch information
lbernick authored and tekton-robot committed May 11, 2022
1 parent 33e3a55 commit 155179b
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 16 deletions.
7 changes: 3 additions & 4 deletions pkg/reconciler/pipeline/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,11 @@ func Build(tasks Tasks, deps map[string][]string) (*Graph, error) {
return d, nil
}

// GetSchedulable returns a set of PipelineTask names that can be scheduled,
// given a list of successfully finished doneTasks. It returns tasks which have
// all dependencies marked as done, and thus can be scheduled. If the specified
// GetCandidateTasks returns a set of names of PipelineTasks whose ancestors are all completed,
// given a list of finished doneTasks. If the specified
// doneTasks are invalid (i.e. if it is indicated that a Task is done, but the
// previous Tasks are not done), an error is returned.
func GetSchedulable(g *Graph, doneTasks ...string) (sets.String, error) {
func GetCandidateTasks(g *Graph, doneTasks ...string) (sets.String, error) {
roots := getRoots(g)
tm := sets.NewString(doneTasks...)
d := sets.NewString()
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/pipeline/dag/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestGetSchedulable(t *testing.T) {
}}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
tasks, err := dag.GetSchedulable(g, tc.finished...)
tasks, err := dag.GetCandidateTasks(g, tc.finished...)
if err != nil {
t.Fatalf("Didn't expect error when getting next tasks for %v but got %v", tc.finished, err)
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestGetSchedulable_Invalid(t *testing.T) {
}}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
_, err := dag.GetSchedulable(g, tc.finished...)
_, err := dag.GetCandidateTasks(g, tc.finished...)
if err == nil {
t.Fatalf("Expected error for invalid done tasks %v but got none", tc.finished)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (facts *PipelineRunFacts) DAGExecutionQueue() (PipelineRunState, error) {
}
// candidateTasks is initialized to DAG root nodes to start pipeline execution
// candidateTasks is derived based on successfully finished tasks and/or skipped tasks
candidateTasks, err := dag.GetSchedulable(facts.TasksGraph, facts.successfulOrSkippedDAGTasks()...)
candidateTasks, err := dag.GetCandidateTasks(facts.TasksGraph, facts.completedOrSkippedDAGTasks()...)
if err != nil {
return tasks, err
}
Expand Down Expand Up @@ -624,13 +624,13 @@ func (facts *PipelineRunFacts) GetPipelineTaskStatus() map[string]string {
return tStatus
}

// successfulOrSkippedTasks returns a list of the names of all of the PipelineTasks in state
// which have successfully completed or skipped
func (facts *PipelineRunFacts) successfulOrSkippedDAGTasks() []string {
// completedOrSkippedTasks returns a list of the names of all of the PipelineTasks in state
// which have completed or skipped
func (facts *PipelineRunFacts) completedOrSkippedDAGTasks() []string {
tasks := []string{}
for _, t := range facts.State {
if facts.isDAGTask(t.PipelineTask.Name) {
if t.IsSuccessful() || t.Skip(facts).IsSkipped {
if t.IsDone(facts) {
tasks = append(tasks, t.PipelineTask.Name)
}
}
Expand Down
192 changes: 187 additions & 5 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,189 @@ func TestDAGExecutionQueue(t *testing.T) {
}
}

func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
// TestDAGExecutionQueueSequentialTasks tests the DAGExecutionQueue function for sequential TaskRuns
// in different states for a running or stopping PipelineRun.
func TestDAGExecutionQueueSequentialTasks(t *testing.T) {
firstTask := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "task-1",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRunName: "task-1",
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}
secondTask := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "task-2",
TaskRef: &v1beta1.TaskRef{Name: "task"},
RunAfter: []string{"task-1"},
},
TaskRunName: "task-2",
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}

tcs := []struct {
name string
firstTaskRun *v1beta1.TaskRun
secondTaskRun *v1beta1.TaskRun
specStatus v1beta1.PipelineRunSpecStatus
wantFirst bool
wantSecond bool
}{{
name: "not started",
wantFirst: true,
}, {
name: "first task running",
firstTaskRun: newTaskRun(trs[0]),
}, {
name: "first task succeeded",
firstTaskRun: makeSucceeded(trs[0]),
wantSecond: true,
}, {
name: "first task failed",
firstTaskRun: makeFailed(trs[0]),
}, {
name: "first task succeeded, second task running",
firstTaskRun: makeSucceeded(trs[0]),
secondTaskRun: newTaskRun(trs[1]),
}, {
name: "first task succeeded, second task succeeded",
firstTaskRun: makeSucceeded(trs[0]),
secondTaskRun: makeSucceeded(trs[1]),
}, {
name: "first task succeeded, second task failed",
firstTaskRun: makeSucceeded(trs[0]),
secondTaskRun: makeFailed(trs[1]),
}}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
firstTask.TaskRun = tc.firstTaskRun
defer func() { firstTask.TaskRun = nil }()
secondTask.TaskRun = tc.secondTaskRun
defer func() { secondTask.TaskRun = nil }()
state := PipelineRunState{&firstTask, &secondTask}
d, err := dagFromState(state)
if err != nil {
t.Fatalf("Unexpected error while building DAG for state %v: %v", state, err)
}
facts := PipelineRunFacts{
State: state,
SpecStatus: tc.specStatus,
TasksGraph: d,
FinalTasksGraph: &dag.Graph{},
}
queue, err := facts.DAGExecutionQueue()
if err != nil {
t.Errorf("unexpected error getting DAG execution queue but got error %s", err)
}
var expectedQueue PipelineRunState
if tc.wantFirst {
expectedQueue = append(expectedQueue, &firstTask)
}
if tc.wantSecond {
expectedQueue = append(expectedQueue, &secondTask)
}
if d := cmp.Diff(expectedQueue, queue, cmpopts.EquateEmpty()); d != "" {
t.Errorf("Didn't get expected execution queue: %s", diff.PrintWantGot(d))
}
})
}
}

// TestDAGExecutionQueueSequentialRuns tests the DAGExecutionQueue function for sequential Runs
// in different states for a running or stopping PipelineRun.
func TestDAGExecutionQueueSequentialRuns(t *testing.T) {
firstRun := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "task-1",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
RunName: "task-1",
CustomTask: true,
}
secondRun := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "task-2",
TaskRef: &v1beta1.TaskRef{Name: "task"},
RunAfter: []string{"task-1"},
},
RunName: "task-2",
CustomTask: true,
}

tcs := []struct {
name string
firstRun *v1alpha1.Run
secondRun *v1alpha1.Run
specStatus v1beta1.PipelineRunSpecStatus
wantFirst bool
wantSecond bool
}{{
name: "not started",
wantFirst: true,
}, {
name: "first run running",
firstRun: newRun(runs[0]),
}, {
name: "first run succeeded",
firstRun: makeRunSucceeded(runs[0]),
wantSecond: true,
}, {
name: "first run failed",
firstRun: makeRunFailed(runs[0]),
}, {
name: "first run succeeded, second run running",
firstRun: makeRunSucceeded(runs[0]),
secondRun: newRun(runs[1]),
}, {
name: "first run succeeded, second run succeeded",
firstRun: makeRunSucceeded(runs[0]),
secondRun: makeRunSucceeded(runs[1]),
}, {
name: "first run succeeded, second run failed",
firstRun: makeRunSucceeded(runs[0]),
secondRun: makeRunFailed(runs[1]),
}}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
firstRun.Run = tc.firstRun
defer func() { firstRun.Run = nil }()
secondRun.Run = tc.secondRun
defer func() { secondRun.Run = nil }()
state := PipelineRunState{&firstRun, &secondRun}
d, err := dagFromState(state)
if err != nil {
t.Fatalf("Unexpected error while building DAG for state %v: %v", state, err)
}
facts := PipelineRunFacts{
State: state,
SpecStatus: tc.specStatus,
TasksGraph: d,
FinalTasksGraph: &dag.Graph{},
}
queue, err := facts.DAGExecutionQueue()
if err != nil {
t.Errorf("unexpected error getting DAG execution queue but got error %s", err)
}
var expectedQueue PipelineRunState
if tc.wantFirst {
expectedQueue = append(expectedQueue, &firstRun)
}
if tc.wantSecond {
expectedQueue = append(expectedQueue, &secondRun)
}
if d := cmp.Diff(expectedQueue, queue, cmpopts.EquateEmpty()); d != "" {
t.Errorf("Didn't get expected execution queue: %s", diff.PrintWantGot(d))
}
})
}
}

func TestPipelineRunState_CompletedOrSkippedDAGTasks(t *testing.T) {
largePipelineState := buildPipelineStateWithLargeDepencyGraph(t)
tcs := []struct {
name string
Expand Down Expand Up @@ -823,7 +1005,7 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
}, {
name: "one-task-failed",
state: oneFailedState,
expectedNames: []string{pts[1].Name},
expectedNames: []string{pts[0].Name, pts[1].Name},
}, {
name: "all-finished",
state: allFinishedState,
Expand All @@ -849,7 +1031,7 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
name: "conditional task skipped as the condition execution resulted in failure but the other pipeline task" +
"not skipped since it failed",
state: conditionCheckFailedWithOthersFailedState,
expectedNames: []string{pts[5].Name},
expectedNames: []string{pts[5].Name, pts[0].Name},
}, {
name: "large deps, not started",
state: largePipelineState,
Expand All @@ -865,7 +1047,7 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
}, {
name: "one-run-failed",
state: oneRunFailedState,
expectedNames: []string{pts[13].Name},
expectedNames: []string{pts[12].Name, pts[13].Name},
}}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -879,7 +1061,7 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
TasksGraph: d,
FinalTasksGraph: &dag.Graph{},
}
names := facts.successfulOrSkippedDAGTasks()
names := facts.completedOrSkippedDAGTasks()
if d := cmp.Diff(names, tc.expectedNames); d != "" {
t.Errorf("Expected to get completed names %v but got something different %s", tc.expectedNames, diff.PrintWantGot(d))
}
Expand Down

0 comments on commit 155179b

Please sign in to comment.