Skip to content

Commit

Permalink
aggregate status of tasks
Browse files Browse the repository at this point in the history
Implementing TEP-0049, it is now possible to access aggregate execution
status of all tasks using `$(tasks.status)`. This context variable is
only available in a finally task.
  • Loading branch information
pritidesai committed Apr 1, 2021
1 parent 1f5980f commit c15d8e1
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 9 deletions.
53 changes: 51 additions & 2 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ what kind of events are triggered based on the `Pipelinerun` status.

### Using Execution `Status` of `pipelineTask`

Finally Task can utilize execution status of any of the `pipelineTasks` under `tasks` section using param:
A `finally` Task can utilize execution status of any of the `pipelineTasks` under `tasks` section using param:

```yaml
finally:
Expand Down Expand Up @@ -900,6 +900,39 @@ This kind of variable can have any one of the values from the following table:

For an end-to-end example, see [`status` in a `PipelineRun`](../examples/v1beta1/pipelineruns/pipelinerun-task-execution-status.yaml).

### Using Aggregate Execution `Status` of All `Tasks`

A `finally` task can utilize aggregate status of all `Tasks` using `params`:

```yaml
finally:
- name: finaltask
params:
- name: aggreateTasksStatus
value: "$(tasks.status)"
taskSpec:
params:
- name: aggreateTasksStatus
steps:
- image: ubuntu
name: check-task-status
script: |
if [ $(params.aggreateTasksStatus) == "Failed" ]
then
echo "Looks like one or more tasks returned failure, continue processing the failure"
fi
```

This kind of variable can have any one of the values from the following table:

| Status | Description |
| ------- | -----------|
| `Succeeded` | all `tasks` have succeeded |
| `Failed` | one ore more `tasks` failed |
| `Completed` | all `tasks` completed successfully including one or more skipped tasks |
| `None` | no aggregate execution status available (i.e. none of the above), one or more `tasks` could be pending/running/cancelled/timedout |


### Guard `Finally Task` execution using `WhenExpressions`

Similar to `Tasks`, `Finally Tasks` can be guarded using [`WhenExpressions`](#guard-task-execution-using-whenexpressions)
Expand Down Expand Up @@ -985,7 +1018,7 @@ If the `WhenExpressions` in a `Finally Task` use `Results` from a skipped or fai
#### `WhenExpressions` using `Execution Status` of `PipelineTask` in `Finally Tasks`

`WhenExpressions` in `Finally Tasks` can utilize [`Execution Status` of `PipelineTasks`](#using-execution-status-of-pipelinetask),
as as demonstrated using [`golang-build`](https://github.com/tektoncd/catalog/tree/main/task/golang-build/0.1) and
as demonstrated using [`golang-build`](https://github.com/tektoncd/catalog/tree/main/task/golang-build/0.1) and
[`send-to-channel-slack`](https://github.com/tektoncd/catalog/tree/main/task/send-to-channel-slack/0.1) Catalog `Tasks`:

```yaml
Expand Down Expand Up @@ -1013,6 +1046,22 @@ spec:

For an end-to-end example, see [PipelineRun with WhenExpressions](../examples/v1beta1/pipelineruns/pipelinerun-with-when-expressions.yaml).

#### `WhenExpressions` using `Aggregate Execution Status` of `Tasks` in `Finally Tasks`

`WhenExpressions` in `Finally Tasks` can utilize
[`Aggregate Execution Status` of `Tasks`](#using-aggregate-execution-status-of-all-tasks) as demonstrated:

```yaml
finally:
- name: notify-any-failure # executed only when one or more tasks fail
when:
- input: $(tasks.status)
operator: in
values: ["Failed"]
taskRef:
name: notify-failure
```

### Known Limitations

#### Specifying `Resources` in Final Tasks
Expand Down
2 changes: 1 addition & 1 deletion docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ For instructions on using variable substitutions see the relevant section of [th
| `context.pipelineRun.uid` | The uid of the `PipelineRun` that this `Pipeline` is running in. |
| `context.pipeline.name` | The name of this `Pipeline` . |
| `tasks.<pipelineTaskName>.status` | The execution status of the specified `pipelineTask`, only available in `finally` tasks. |

| `tasks.status` | An aggregate status of all `Tasks`, only available in `finally` tasks. |

## Variables available in a `Task`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,17 @@ spec:
if [[ $(params.task1Status) != "Succeeded" || $(params.task2Status) != "None" ]]; then
exit 1;
fi
- name: task4 # this task verifies the aggregate status of all tasks, it fails if verification fails
params:
- name: aggregateStatus
value: "$(tasks.status)"
taskSpec:
params:
- name: aggregateStatus
steps:
- image: alpine
name: verify-aggregate-tasks-status
script: |
if [[ $(params.aggregateStatus) != "Completed" ]]; then
exit 1;
fi
5 changes: 5 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"knative.dev/pkg/apis"
)

const (
// PipelineTasksAggregateStatus is a param representing aggregate status of all dag pipelineTasks
PipelineTasksAggregateStatus = "tasks.status"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +genclient:noStatus
Expand Down
8 changes: 6 additions & 2 deletions pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ func validateExecutionStatusVariablesInTasks(tasks []PipelineTask) (errs *apis.F
for _, p := range ps {
// check if it contains context variable accessing execution status - $(tasks.taskname.status)
if containsExecutionStatusRef(p) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("pipeline tasks can not refer to execution status of any other pipeline task"),
"value").ViaFieldKey("params", param.Name).ViaFieldIndex("tasks", idx))
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("pipeline tasks can not refer to execution status of any other pipeline task"+
" or aggregate status of tasks"), "value").ViaFieldKey("params", param.Name).ViaFieldIndex("tasks", idx))
}
}
}
Expand Down Expand Up @@ -247,6 +247,10 @@ func validateExecutionStatusVariablesExpressions(expressions []string, ptNames s
for _, expression := range expressions {
// check if it contains context variable accessing execution status - $(tasks.taskname.status)
if containsExecutionStatusRef(expression) {
// its a reference to aggregate status of dag tasks - $(tasks.status)
if expression == PipelineTasksAggregateStatus {
continue
}
// strip tasks. and .status from tasks.taskname.status to further verify task name
pt := strings.TrimSuffix(strings.TrimPrefix(expression, "tasks."), ".status")
// report an error if the task name does not exist in the list of dag tasks
Expand Down
34 changes: 31 additions & 3 deletions pkg/apis/pipeline/v1beta1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2289,6 +2289,8 @@ func TestPipelineTasksExecutionStatus(t *testing.T) {
TaskRef: &TaskRef{Name: "bar-task"},
Params: []Param{{
Name: "foo-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.foo.status)"},
}, {
Name: "tasks-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.status)"},
}},
}},
}, {
Expand Down Expand Up @@ -2334,9 +2336,22 @@ func TestPipelineTasksExecutionStatus(t *testing.T) {
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task`,
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[bar-status].value"},
},
}, {
name: "invalid string variable in dag task accessing aggregate status of tasks",
tasks: []PipelineTask{{
Name: "foo",
TaskRef: &TaskRef{Name: "foo-task"},
Params: []Param{{
Name: "tasks-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.status)"},
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[tasks-status].value"},
},
}, {
name: "invalid variable concatenated with extra string in dag task accessing pipelineTask status",
tasks: []PipelineTask{{
Expand All @@ -2347,7 +2362,7 @@ func TestPipelineTasksExecutionStatus(t *testing.T) {
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task`,
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[bar-status].value"},
},
}, {
Expand All @@ -2360,9 +2375,22 @@ func TestPipelineTasksExecutionStatus(t *testing.T) {
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task`,
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[bar-status].value"},
},
}, {
name: "invalid array variable in dag task accessing aggregate tasks status",
tasks: []PipelineTask{{
Name: "foo",
TaskRef: &TaskRef{Name: "foo-task"},
Params: []Param{{
Name: "tasks-status", Value: ArrayOrString{Type: ParamTypeArray, ArrayVal: []string{"$(tasks.status)"}},
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[tasks-status].value"},
},
}, {
name: "invalid string variable in finally accessing missing pipelineTask status",
finalTasks: []PipelineTask{{
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/pipelinerun/resources/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func ApplyTaskResults(targets PipelineRunState, resolvedResultRefs ResolvedResul
}
}

//ApplyPipelineTaskContext replaces context variables referring to execution status with the specified status
// ApplyPipelineTaskContext replaces context variables referring to execution status with the specified status
func ApplyPipelineTaskContext(state PipelineRunState, replacements map[string]string) {
for _, resolvedPipelineRunTask := range state {
if resolvedPipelineRunTask.PipelineTask != nil {
Expand Down
22 changes: 22 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,28 @@ func (facts *PipelineRunFacts) GetPipelineTaskStatus() map[string]string {
tStatus[PipelineTaskStatusPrefix+t.PipelineTask.Name+PipelineTaskStatusSuffix] = s
}
}
// initialize aggregate status of all dag tasks to None
aggregateStatus := PipelineTaskStateNone
if facts.checkDAGTasksDone() {
// all dag tasks are done, change the aggregate status to succeeded
// will reset it to failed/skipped if needed
aggregateStatus = v1beta1.PipelineRunReasonSuccessful.String()
for _, t := range facts.State {
if facts.isDAGTask(t.PipelineTask.Name) {
// if any of the dag task failed, change the aggregate status to failed and return
if t.IsConditionStatusFalse() {
aggregateStatus = v1beta1.PipelineRunReasonFailed.String()
break
}
// if any of the dag task skipped, change the aggregate status to completed
// but continue checking for any other failure
if t.Skip(facts) {
aggregateStatus = v1beta1.PipelineRunReasonCompleted.String()
}
}
}
}
tStatus[v1beta1.PipelineTasksAggregateStatus] = aggregateStatus
return tStatus
}

Expand Down
31 changes: 31 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,7 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) {
expectedStatus: map[string]string{
PipelineTaskStatusPrefix + pts[0].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
PipelineTaskStatusPrefix + pts[1].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
v1beta1.PipelineTasksAggregateStatus: PipelineTaskStateNone,
},
}, {
name: "one-task-started",
Expand All @@ -1370,6 +1371,7 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) {
expectedStatus: map[string]string{
PipelineTaskStatusPrefix + pts[0].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
PipelineTaskStatusPrefix + pts[1].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
v1beta1.PipelineTasksAggregateStatus: PipelineTaskStateNone,
},
}, {
name: "one-task-finished",
Expand All @@ -1378,6 +1380,7 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) {
expectedStatus: map[string]string{
PipelineTaskStatusPrefix + pts[0].Name + PipelineTaskStatusSuffix: v1beta1.TaskRunReasonSuccessful.String(),
PipelineTaskStatusPrefix + pts[1].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
v1beta1.PipelineTasksAggregateStatus: PipelineTaskStateNone,
},
}, {
name: "one-task-failed",
Expand All @@ -1386,6 +1389,7 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) {
expectedStatus: map[string]string{
PipelineTaskStatusPrefix + pts[0].Name + PipelineTaskStatusSuffix: v1beta1.TaskRunReasonFailed.String(),
PipelineTaskStatusPrefix + pts[1].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
v1beta1.PipelineTasksAggregateStatus: v1beta1.PipelineRunReasonFailed.String(),
},
}, {
name: "all-finished",
Expand All @@ -1394,6 +1398,7 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) {
expectedStatus: map[string]string{
PipelineTaskStatusPrefix + pts[0].Name + PipelineTaskStatusSuffix: v1beta1.TaskRunReasonSuccessful.String(),
PipelineTaskStatusPrefix + pts[1].Name + PipelineTaskStatusSuffix: v1beta1.TaskRunReasonSuccessful.String(),
v1beta1.PipelineTasksAggregateStatus: v1beta1.PipelineRunReasonSuccessful.String(),
},
}, {
name: "task-with-when-expressions-passed",
Expand All @@ -1408,6 +1413,7 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) {
dagTasks: []v1beta1.PipelineTask{pts[9]},
expectedStatus: map[string]string{
PipelineTaskStatusPrefix + pts[9].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
v1beta1.PipelineTasksAggregateStatus: PipelineTaskStateNone,
},
}, {
name: "tasks-when-expression-failed-and-task-skipped",
Expand All @@ -1421,6 +1427,7 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) {
dagTasks: []v1beta1.PipelineTask{pts[10]},
expectedStatus: map[string]string{
PipelineTaskStatusPrefix + pts[10].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
v1beta1.PipelineTasksAggregateStatus: v1beta1.PipelineRunReasonCompleted.String(),
},
}, {
name: "when-expression-task-with-parent-started",
Expand All @@ -1441,13 +1448,37 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) {
expectedStatus: map[string]string{
PipelineTaskStatusPrefix + pts[0].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
PipelineTaskStatusPrefix + pts[11].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
v1beta1.PipelineTasksAggregateStatus: PipelineTaskStateNone,
},
}, {
name: "task-cancelled",
state: taskCancelled,
dagTasks: []v1beta1.PipelineTask{pts[4]},
expectedStatus: map[string]string{
PipelineTaskStatusPrefix + pts[4].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
v1beta1.PipelineTasksAggregateStatus: PipelineTaskStateNone,
},
}, {
name: "one-skipped-one-failed-aggregate-status-must-be-failed",
state: PipelineRunState{{
PipelineTask: &pts[10],
TaskRunName: "pr-guardedtask-skipped",
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}, {
PipelineTask: &pts[0],
TaskRunName: "pipelinerun-mytask1",
TaskRun: makeFailed(trs[0]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}},
dagTasks: []v1beta1.PipelineTask{pts[0], pts[10]},
expectedStatus: map[string]string{
PipelineTaskStatusPrefix + pts[0].Name + PipelineTaskStatusSuffix: v1beta1.PipelineRunReasonFailed.String(),
PipelineTaskStatusPrefix + pts[10].Name + PipelineTaskStatusSuffix: PipelineTaskStateNone,
v1beta1.PipelineTasksAggregateStatus: v1beta1.PipelineRunReasonFailed.String(),
},
}}
for _, tc := range tcs {
Expand Down
12 changes: 12 additions & 0 deletions test/pipelinefinally_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ func TestPipelineLevelFinally_OneDAGTaskFailed_InvalidTaskResult_Failure(t *test
Type: "string",
StringVal: "$(tasks.dagtask3.status)",
},
}, {
Name: "dagtasks-aggregate-status",
Value: v1beta1.ArrayOrString{
Type: "string",
StringVal: "$(tasks.status)",
},
}},
},
// final task consuming result from a failed dag task
Expand Down Expand Up @@ -315,6 +321,10 @@ func TestPipelineLevelFinally_OneDAGTaskFailed_InvalidTaskResult_Failure(t *test
if p.Value.StringVal != resources.PipelineTaskStateNone {
t.Errorf("Task param \"%s\" is set to \"%s\", expected it to resolve to \"%s\"", param, p.Value.StringVal, resources.PipelineTaskStateNone)
}
case "dagtasks-aggregate-status":
if p.Value.StringVal != v1beta1.PipelineRunReasonFailed.String() {
t.Errorf("Task param \"%s\" is set to \"%s\", expected it to resolve to \"%s\"", param, p.Value.StringVal, v1beta1.PipelineRunReasonFailed.String())
}
}
}
case n == "finaltaskconsumingdagtask5":
Expand Down Expand Up @@ -510,6 +520,8 @@ func getTaskVerifyingStatus(t *testing.T, namespace string) *v1beta1.Task {
Name: "dagtask2-status",
}, {
Name: "dagtask3-status",
}, {
Name: "dagtasks-aggregate-status",
}}
return getTaskDef(helpers.ObjectNameForTest(t), namespace, "exit 0", params, []v1beta1.TaskResult{})
}
Expand Down

0 comments on commit c15d8e1

Please sign in to comment.