Skip to content

Commit

Permalink
aggregate status of tasks
Browse files Browse the repository at this point in the history
Implementing TEP-0049, it is now possible to access aggregate execution
status of all tasks using `$(tasks.status)`. This context variable is
only available in a finally task.
  • Loading branch information
pritidesai authored and tekton-robot committed May 6, 2021
1 parent 9a9f896 commit d3cc94e
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 15 deletions.
57 changes: 55 additions & 2 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,8 @@ what kind of events are triggered based on the `Pipelinerun` status.

### Using Execution `Status` of `pipelineTask`

Finally Task can utilize execution status of any of the `pipelineTasks` under `tasks` section using param:
A `pipeline` can check the status of a specific `pipelineTask` from the `tasks` section in `finally` through the task
parameters:

```yaml
finally:
Expand Down Expand Up @@ -900,6 +901,40 @@ This kind of variable can have any one of the values from the following table:

For an end-to-end example, see [`status` in a `PipelineRun`](../examples/v1beta1/pipelineruns/pipelinerun-task-execution-status.yaml).

### Using Aggregate Execution `Status` of All `Tasks`

A `pipeline` can check an aggregate status of all the `tasks` section in `finally` through the task parameters:

```yaml
finally:
- name: finaltask
params:
- name: aggregateTasksStatus
value: "$(tasks.status)"
taskSpec:
params:
- name: aggregateTasksStatus
steps:
- image: ubuntu
name: check-task-status
script: |
if [ $(params.aggregateTasksStatus) == "Failed" ]
then
echo "Looks like one or more tasks returned failure, continue processing the failure"
fi
```

This kind of variable can have any one of the values from the following table:

| Status | Description |
| ------- | -----------|
| `Succeeded` | all `tasks` have succeeded |
| `Failed` | one ore more `tasks` failed |
| `Completed` | all `tasks` completed successfully including one or more skipped tasks |
| `None` | no aggregate execution status available (i.e. none of the above), one or more `tasks` could be pending/running/cancelled/timedout |

For an end-to-end example, see [`$(tasks.status)` usage in a `Pipeline`](../examples/v1beta1/pipelineruns/pipelinerun-task-execution-status.yaml).

### Guard `Finally Task` execution using `WhenExpressions`

Similar to `Tasks`, `Finally Tasks` can be guarded using [`WhenExpressions`](#guard-task-execution-using-whenexpressions)
Expand Down Expand Up @@ -985,7 +1020,7 @@ If the `WhenExpressions` in a `Finally Task` use `Results` from a skipped or fai
#### `WhenExpressions` using `Execution Status` of `PipelineTask` in `Finally Tasks`

`WhenExpressions` in `Finally Tasks` can utilize [`Execution Status` of `PipelineTasks`](#using-execution-status-of-pipelinetask),
as as demonstrated using [`golang-build`](https://github.com/tektoncd/catalog/tree/main/task/golang-build/0.1) and
as demonstrated using [`golang-build`](https://github.com/tektoncd/catalog/tree/main/task/golang-build/0.1) and
[`send-to-channel-slack`](https://github.com/tektoncd/catalog/tree/main/task/send-to-channel-slack/0.1) Catalog `Tasks`:

```yaml
Expand Down Expand Up @@ -1013,6 +1048,24 @@ spec:

For an end-to-end example, see [PipelineRun with WhenExpressions](../examples/v1beta1/pipelineruns/pipelinerun-with-when-expressions.yaml).

#### `WhenExpressions` using `Aggregate Execution Status` of `Tasks` in `Finally Tasks`

`WhenExpressions` in `Finally Tasks` can utilize
[`Aggregate Execution Status` of `Tasks`](#using-aggregate-execution-status-of-all-tasks) as demonstrated:

```yaml
finally:
- name: notify-any-failure # executed only when one or more tasks fail
when:
- input: $(tasks.status)
operator: in
values: ["Failed"]
taskRef:
name: notify-failure
```

For an end-to-end example, see [PipelineRun with WhenExpressions](../examples/v1beta1/pipelineruns/pipelinerun-with-when-expressions.yaml).

### Known Limitations

#### Specifying `Resources` in Final Tasks
Expand Down
2 changes: 1 addition & 1 deletion docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ For instructions on using variable substitutions see the relevant section of [th
| `context.pipelineRun.uid` | The uid of the `PipelineRun` that this `Pipeline` is running in. |
| `context.pipeline.name` | The name of this `Pipeline` . |
| `tasks.<pipelineTaskName>.status` | The execution status of the specified `pipelineTask`, only available in `finally` tasks. |

| `tasks.status` | An aggregate status of all `tasks`, only available in `finally` tasks. |

## Variables available in a `Task`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,17 @@ spec:
if [[ $(params.task1Status) != "Succeeded" || $(params.task2Status) != "None" ]]; then
exit 1;
fi
- name: task4 # this task verifies the aggregate status of all tasks, it fails if verification fails
params:
- name: aggregateStatus
value: "$(tasks.status)"
taskSpec:
params:
- name: aggregateStatus
steps:
- image: alpine
name: verify-aggregate-tasks-status
script: |
if [[ $(params.aggregateStatus) != "Completed" ]]; then
exit 1;
fi
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,24 @@ spec:
- name: echo
image: ubuntu
script: exit 1
- name: finally-task-should-be-executed # when expression using execution status, param and results
- name: finally-task-should-be-skipped-4 # when expression using tasks execution status, evaluates to false
when:
- input: "$(tasks.status)"
operator: in
values: ["Failure"]
taskSpec:
steps:
- name: echo
image: ubuntu
script: exit 1
- name: finally-task-should-be-executed # when expression using execution status, tasks execution status, param, and results
when:
- input: "$(tasks.echo-file-exists.status)"
operator: in
values: ["Succeeded"]
- input: "$(tasks.status)"
operator: in
values: ["Succeeded"]
- input: "$(tasks.check-file.results.exists)"
operator: in
values: ["yes"]
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"knative.dev/pkg/apis"
)

const (
// PipelineTasksAggregateStatus is a param representing aggregate status of all dag pipelineTasks
PipelineTasksAggregateStatus = "tasks.status"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +genclient:noStatus
Expand Down
19 changes: 14 additions & 5 deletions pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,25 +203,30 @@ func validateExecutionStatusVariablesInTasks(tasks []PipelineTask) (errs *apis.F
for _, param := range t.Params {
// retrieve a list of substitution expression from a param
if ps, ok := GetVarSubstitutionExpressionsForParam(param); ok {
// validate tasks.pipelineTask.status if this expression is not a result reference
// validate tasks.pipelineTask.status/tasks.status if this expression is not a result reference
if !LooksLikeContainsResultRefs(ps) {
for _, p := range ps {
// check if it contains context variable accessing execution status - $(tasks.taskname.status)
// or an aggregate status - $(tasks.status)
if containsExecutionStatusRef(p) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("pipeline tasks can not refer to execution status of any other pipeline task"),
"value").ViaFieldKey("params", param.Name).ViaFieldIndex("tasks", idx))
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("pipeline tasks can not refer to execution status of any other pipeline task"+
" or aggregate status of tasks"), "value").ViaFieldKey("params", param.Name).ViaFieldIndex("tasks", idx))
}
}
}
}
}
for i, we := range t.WhenExpressions {
// retrieve a list of substitution expression from a when expression
if expressions, ok := we.GetVarSubstitutionExpressions(); ok {
// validate tasks.pipelineTask.status/tasks.status if this expression is not a result reference
if !LooksLikeContainsResultRefs(expressions) {
for _, e := range expressions {
// check if it contains context variable accessing execution status - $(tasks.taskname.status)
// or an aggregate status - $(tasks.status)
if containsExecutionStatusRef(e) {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("when expressions in pipeline tasks can not refer to execution status of any other pipeline task"),
"").ViaFieldIndex("when", i).ViaFieldIndex("tasks", idx))
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("when expressions in pipeline tasks can not refer to execution status of any other pipeline task"+
" or aggregate status of tasks"), "").ViaFieldIndex("when", i).ViaFieldIndex("tasks", idx))
}
}
}
Expand Down Expand Up @@ -257,6 +262,10 @@ func validateExecutionStatusVariablesExpressions(expressions []string, ptNames s
// validate tasks.pipelineTask.status if this expression is not a result reference
if !LooksLikeContainsResultRefs(expressions) {
for _, expression := range expressions {
// its a reference to aggregate status of dag tasks - $(tasks.status)
if expression == PipelineTasksAggregateStatus {
continue
}
// check if it contains context variable accessing execution status - $(tasks.taskname.status)
if containsExecutionStatusRef(expression) {
// strip tasks. and .status from tasks.taskname.status to further verify task name
Expand Down
42 changes: 37 additions & 5 deletions pkg/apis/pipeline/v1beta1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2290,11 +2290,17 @@ func TestPipelineTasksExecutionStatus(t *testing.T) {
TaskRef: &TaskRef{Name: "bar-task"},
Params: []Param{{
Name: "foo-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.foo.status)"},
}, {
Name: "tasks-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.status)"},
}},
WhenExpressions: WhenExpressions{WhenExpression{
WhenExpressions: WhenExpressions{{
Input: "$(tasks.foo.status)",
Operator: selection.In,
Values: []string{"Failure"},
}, {
Input: "$(tasks.status)",
Operator: selection.In,
Values: []string{"Success"},
}},
}},
}, {
Expand Down Expand Up @@ -2350,12 +2356,25 @@ func TestPipelineTasksExecutionStatus(t *testing.T) {
}},
}},
expectedError: *apis.ErrGeneric("").Also(&apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task`,
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[bar-status].value"},
}).Also(&apis.FieldError{
Message: `invalid value: when expressions in pipeline tasks can not refer to execution status of any other pipeline task`,
Message: `invalid value: when expressions in pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].when[0]"},
}),
}, {
name: "invalid string variable in dag task accessing aggregate status of tasks",
tasks: []PipelineTask{{
Name: "foo",
TaskRef: &TaskRef{Name: "foo-task"},
Params: []Param{{
Name: "tasks-status", Value: ArrayOrString{Type: ParamTypeString, StringVal: "$(tasks.status)"},
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[tasks-status].value"},
},
}, {
name: "invalid variable concatenated with extra string in dag task accessing pipelineTask status",
tasks: []PipelineTask{{
Expand All @@ -2366,7 +2385,7 @@ func TestPipelineTasksExecutionStatus(t *testing.T) {
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task`,
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[bar-status].value"},
},
}, {
Expand All @@ -2379,9 +2398,22 @@ func TestPipelineTasksExecutionStatus(t *testing.T) {
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task`,
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[bar-status].value"},
},
}, {
name: "invalid array variable in dag task accessing aggregate tasks status",
tasks: []PipelineTask{{
Name: "foo",
TaskRef: &TaskRef{Name: "foo-task"},
Params: []Param{{
Name: "tasks-status", Value: ArrayOrString{Type: ParamTypeArray, ArrayVal: []string{"$(tasks.status)"}},
}},
}},
expectedError: apis.FieldError{
Message: `invalid value: pipeline tasks can not refer to execution status of any other pipeline task or aggregate status of tasks`,
Paths: []string{"tasks[0].params[tasks-status].value"},
},
}, {
name: "invalid string variable in finally accessing missing pipelineTask status",
finalTasks: []PipelineTask{{
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/pipelinerun/resources/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func ApplyTaskResults(targets PipelineRunState, resolvedResultRefs ResolvedResul
}
}

//ApplyPipelineTaskContext replaces context variables referring to execution status with the specified status
// ApplyPipelineTaskContext replaces context variables referring to execution status with the specified status
func ApplyPipelineTaskContext(state PipelineRunState, replacements map[string]string) {
for _, resolvedPipelineRunTask := range state {
if resolvedPipelineRunTask.PipelineTask != nil {
Expand Down
22 changes: 22 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,28 @@ func (facts *PipelineRunFacts) GetPipelineTaskStatus() map[string]string {
tStatus[PipelineTaskStatusPrefix+t.PipelineTask.Name+PipelineTaskStatusSuffix] = s
}
}
// initialize aggregate status of all dag tasks to None
aggregateStatus := PipelineTaskStateNone
if facts.checkDAGTasksDone() {
// all dag tasks are done, change the aggregate status to succeeded
// will reset it to failed/skipped if needed
aggregateStatus = v1beta1.PipelineRunReasonSuccessful.String()
for _, t := range facts.State {
if facts.isDAGTask(t.PipelineTask.Name) {
// if any of the dag task failed, change the aggregate status to failed and return
if t.IsConditionStatusFalse() {
aggregateStatus = v1beta1.PipelineRunReasonFailed.String()
break
}
// if any of the dag task skipped, change the aggregate status to completed
// but continue checking for any other failure
if t.Skip(facts) {
aggregateStatus = v1beta1.PipelineRunReasonCompleted.String()
}
}
}
}
tStatus[v1beta1.PipelineTasksAggregateStatus] = aggregateStatus
return tStatus
}

Expand Down
Loading

0 comments on commit d3cc94e

Please sign in to comment.