Skip to content

Commit

Permalink
TEP-0090: Fan Out TaskRuns
Browse files Browse the repository at this point in the history
[TEP-0090: Matrix][tep-0090] proposed executing a `PipelineTask` in
parallel `TaskRuns` and `Runs` with substitutions from combinations
of `Parameters` in a `Matrix`.

This change implements the fan out of `TaskRuns` from a `PipelineTask`
with a `Matrix`. The fanned-out `TaskRuns` are executed in parallel.

[tep-0090]: https://github.com/tektoncd/community/blob/main/teps/0090-matrix.md
  • Loading branch information
jerop committed Jun 17, 2022
1 parent d48cfdd commit f0a9c38
Show file tree
Hide file tree
Showing 9 changed files with 600 additions and 45 deletions.
72 changes: 72 additions & 0 deletions docs/matrix.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ weight: 11
- [Results](#results)
- [Specifying Results in a Matrix](#specifying-results-in-a-matrix)
- [Results from fanned out PipelineTasks](#results-from-fanned-out-pipelinetasks)
- [Fan Out](#fan-out)
- [`PipelineTasks` with `Tasks`](#pipelinetasks-with-tasks)

## Overview

Expand Down Expand Up @@ -134,3 +136,73 @@ Consuming `Results` from previous `TaskRuns` or `Runs` in a `Matrix`, which woul
Consuming `Results` from fanned out `PipelineTasks` will not be in the supported in the initial iteration
of `Matrix`. Supporting consuming `Results` from fanned out `PipelineTasks` will be revisited after array
and object `Results` are supported.

## Fan Out

### `PipelineTasks` with `Tasks`

When a `PipelineTask` has a `Task` and a `Matrix`, the `Task` will be executed in parallel `TaskRuns` with
substitutions from combinations of `Parameters`.

In the example below, nine `TaskRuns` are created with combinations of platforms ("linux", "mac", "windows")
and browsers ("chrome", "safari", "firefox").

```yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: platform-browsers
annotations:
description: |
A task that does something cool with platforms and browsers
spec:
params:
- name: platform
- name: browser
steps:
- name: echo
image: alpine
script: |
echo "$(params.platform) and $(params.browser)"
---
# run platform-browsers task with:
# platforms: linux, mac, windows
# browsers: chrome, safari, firefox
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
generateName: matrixed-pr-
spec:
serviceAccountName: 'default'
pipelineSpec:
tasks:
- name: platforms-and-browsers
matrix:
- name: platform
value:
- linux
- mac
- windows
- name: browser
value:
- chrome
- safari
- firefox
taskRef:
name: platform-browsers
```
```shell
$ tkn taskruns list

NAME STARTED DURATION STATUS
matrixed-pr-6lvzk-platforms-and-browsers-8 11 seconds ago 7 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-6 12 seconds ago 7 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-7 12 seconds ago 9 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-4 12 seconds ago 7 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-5 12 seconds ago 6 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-3 13 seconds ago 7 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-1 13 seconds ago 8 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-2 13 seconds ago 8 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-0 13 seconds ago 8 seconds Succeeded
```
18 changes: 10 additions & 8 deletions docs/pipelineruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -1012,17 +1012,19 @@ Task Runs:

The name of the `TaskRuns` and `Runs` owned by a `PipelineRun` are univocally associated to the owning resource.
If a `PipelineRun` resource is deleted and created with the same name, the child `TaskRuns` will be created with the
same name as before. The base format of the name is `<pipelinerun-name>-<pipelinetask-name>`. The name may vary
according the logic of [`kmeta.ChildName`](https://pkg.go.dev/github.com/knative/pkg/kmeta#ChildName).
same name as before. The base format of the name is `<pipelinerun-name>-<pipelinetask-name>`. If the `PipelineTask`
has a `Matrix`, the name will have an int suffix with format `<pipelinerun-name>-<pipelinetask-name>-<combination-id>`.
The name may vary according the logic of [`kmeta.ChildName`](https://pkg.go.dev/github.com/knative/pkg/kmeta#ChildName).

Some examples:

| `PipelineRun` Name | `PipelineTask` Name | `TaskRun` Name |
|--------------------------|------------------------------|--------------------|
| pipeline-run | task1 | pipeline-run-task1 |
| pipeline-run | task2-0123456789-0123456789-0123456789-0123456789-0123456789 | pipeline-runee4a397d6eab67777d4e6f9991cd19e6-task2-0123456789-0 |
| pipeline-run-0123456789-0123456789-0123456789-0123456789 | task3 | pipeline-run-0123456789-0123456789-0123456789-0123456789-task3 |
| pipeline-run-0123456789-0123456789-0123456789-0123456789 | task2-0123456789-0123456789-0123456789-0123456789-0123456789 | pipeline-run-0123456789-012345607ad8c7aac5873cdfabe472a68996b5c |
| `PipelineRun` Name | `PipelineTask` Name | `TaskRun` Names |
|----------------------------------------------------------|--------------------------------------------------------------|----------------------------------------------------------------------------------------|
| pipeline-run | task1 | pipeline-run-task1 |
| pipeline-run | task2-0123456789-0123456789-0123456789-0123456789-0123456789 | pipeline-runee4a397d6eab67777d4e6f9991cd19e6-task2-0123456789-0 |
| pipeline-run-0123456789-0123456789-0123456789-0123456789 | task3 | pipeline-run-0123456789-0123456789-0123456789-0123456789-task3 |
| pipeline-run-0123456789-0123456789-0123456789-0123456789 | task2-0123456789-0123456789-0123456789-0123456789-0123456789 | pipeline-run-0123456789-012345607ad8c7aac5873cdfabe472a68996b5c |
| pipeline-run | task4 (with 2x2 `Matrix`) | pipeline-run-task1-0, pipeline-run-task1-2, pipeline-run-task1-3, pipeline-run-task1-4 |

## Cancelling a `PipelineRun`

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/config/testdata/config-defaults-empty.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ data:
# default-timeout-minutes contains the default number of
# minutes to use for TaskRun, if none is specified.
default-timeout-minutes: "60" # 60 minutes
# default-max-matrix-combinations-count contains the default maximum number
# of combinations from a Matrix, if none is specified.
default-max-matrix-combinations-count: "256"
5 changes: 3 additions & 2 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,15 +310,16 @@ func (pt *PipelineTask) validateMatrix(ctx context.Context) (errs *apis.FieldErr
}

func (pt *PipelineTask) validateMatrixCombinationsCount(ctx context.Context) (errs *apis.FieldError) {
matrixCombinationsCount := pt.getMatrixCombinationsCount()
matrixCombinationsCount := pt.GetMatrixCombinationsCount()
maxMatrixCombinationsCount := config.FromContextOrDefaults(ctx).Defaults.DefaultMaxMatrixCombinationsCount
if matrixCombinationsCount > maxMatrixCombinationsCount {
errs = errs.Also(apis.ErrOutOfBoundsValue(matrixCombinationsCount, 0, maxMatrixCombinationsCount, "matrix"))
}
return errs
}

func (pt *PipelineTask) getMatrixCombinationsCount() int {
// GetMatrixCombinationsCount returns the count of combinations of Parameters generated from the Matrix in PipelineTask.
func (pt *PipelineTask) GetMatrixCombinationsCount() int {
if len(pt.Matrix) == 0 {
return 0
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/pipeline/v1beta1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ func TestPipelineTask_validateMatrix(t *testing.T) {
}
}

func TestPipelineTask_getMatrixCombinationsCount(t *testing.T) {
func TestPipelineTask_GetMatrixCombinationsCount(t *testing.T) {
tests := []struct {
name string
pt *PipelineTask
Expand Down Expand Up @@ -860,8 +860,8 @@ func TestPipelineTask_getMatrixCombinationsCount(t *testing.T) {
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if d := cmp.Diff(tt.matrixCombinationsCount, tt.pt.getMatrixCombinationsCount()); d != "" {
t.Errorf("PipelineTask.getMatrixCombinationsCount() errors diff %s", diff.PrintWantGot(d))
if d := cmp.Diff(tt.matrixCombinationsCount, tt.pt.GetMatrixCombinationsCount()); d != "" {
t.Errorf("PipelineTask.GetMatrixCombinationsCount() errors diff %s", diff.PrintWantGot(d))
}
})
}
Expand Down
49 changes: 40 additions & 9 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"path/filepath"
"reflect"
"strconv"
"strings"
"time"

Expand All @@ -38,6 +39,7 @@ import (
listersv1alpha1 "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1"
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1"
resourcelisters "github.com/tektoncd/pipeline/pkg/client/resource/listers/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/matrix"
"github.com/tektoncd/pipeline/pkg/pipelinerunmetrics"
tknreconciler "github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
Expand Down Expand Up @@ -688,7 +690,8 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
if rprt == nil || rprt.Skip(pipelineRunFacts).IsSkipped || rprt.IsFinallySkipped(pipelineRunFacts).IsSkipped {
continue
}
if rprt.IsCustomTask() {
switch {
case rprt.IsCustomTask():
if rprt.IsFinalTask(pipelineRunFacts) {
rprt.Run, err = c.createRun(ctx, rprt, pr, getFinallyTaskRunTimeout)
} else {
Expand All @@ -698,16 +701,27 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
recorder.Eventf(pr, corev1.EventTypeWarning, "RunCreationFailed", "Failed to create Run %q: %v", rprt.RunName, err)
return fmt.Errorf("error creating Run called %s for PipelineTask %s from PipelineRun %s: %w", rprt.RunName, rprt.PipelineTask.Name, pr.Name, err)
}
} else {
case rprt.IsMatrixed():
if rprt.IsFinalTask(pipelineRunFacts) {
rprt.TaskRuns, err = c.createTaskRuns(ctx, rprt, pr, as.StorageBasePath(pr), getFinallyTaskRunTimeout)
} else {
rprt.TaskRuns, err = c.createTaskRuns(ctx, rprt, pr, as.StorageBasePath(pr), getTaskRunTimeout)
}
if err != nil {
recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunsCreationFailed", "Failed to create TaskRuns %q: %v", rprt.TaskRunNames, err)
return fmt.Errorf("error creating TaskRuns called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunNames, rprt.PipelineTask.Name, pr.Name, err)
}
default:
if rprt.IsFinalTask(pipelineRunFacts) {
rprt.TaskRun, err = c.createTaskRun(ctx, rprt, pr, as.StorageBasePath(pr), getFinallyTaskRunTimeout)
rprt.TaskRun, err = c.createTaskRun(ctx, rprt.TaskRunName, nil, rprt, pr, as.StorageBasePath(pr), getFinallyTaskRunTimeout)
} else {
rprt.TaskRun, err = c.createTaskRun(ctx, rprt, pr, as.StorageBasePath(pr), getTaskRunTimeout)
rprt.TaskRun, err = c.createTaskRun(ctx, rprt.TaskRunName, nil, rprt, pr, as.StorageBasePath(pr), getTaskRunTimeout)
}
if err != nil {
recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
}

}
}
return nil
Expand Down Expand Up @@ -750,10 +764,24 @@ func (c *Reconciler) updateRunsStatusDirectly(pr *v1beta1.PipelineRun) error {

type getTimeoutFunc func(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask, c clock.PassiveClock) *metav1.Duration

func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string, getTimeoutFunc getTimeoutFunc) (*v1beta1.TaskRun, error) {
func (c *Reconciler) createTaskRuns(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string, getTimeoutFunc getTimeoutFunc) ([]*v1beta1.TaskRun, error) {
var taskRuns []*v1beta1.TaskRun
matrixCombinations := matrix.FanOut(rprt.PipelineTask.Matrix).ToMap()
for i, taskRunName := range rprt.TaskRunNames {
params := matrixCombinations[strconv.Itoa(i)]
taskRun, err := c.createTaskRun(ctx, taskRunName, params, rprt, pr, storageBasePath, getTimeoutFunc)
if err != nil {
return nil, err
}
taskRuns = append(taskRuns, taskRun)
}
return taskRuns, nil
}

func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, params []v1beta1.Param, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string, getTimeoutFunc getTimeoutFunc) (*v1beta1.TaskRun, error) {
logger := logging.FromContext(ctx)

tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(rprt.TaskRunName)
tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(taskRunName)
if tr != nil {
// Don't modify the lister cache's copy.
tr = tr.DeepCopy()
Expand All @@ -767,16 +795,19 @@ func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.Resolved

rprt.PipelineTask = resources.ApplyPipelineTaskContexts(rprt.PipelineTask)
taskRunSpec := pr.GetTaskRunSpec(rprt.PipelineTask.Name)
if len(params) == 0 {
params = rprt.PipelineTask.Params
}
tr = &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: rprt.TaskRunName,
Name: taskRunName,
Namespace: pr.Namespace,
OwnerReferences: []metav1.OwnerReference{*kmeta.NewControllerRef(pr)},
Labels: combineTaskRunAndTaskSpecLabels(pr, rprt.PipelineTask),
Annotations: combineTaskRunAndTaskSpecAnnotations(pr, rprt.PipelineTask),
},
Spec: v1beta1.TaskRunSpec{
Params: rprt.PipelineTask.Params,
Params: params,
ServiceAccountName: taskRunSpec.TaskServiceAccountName,
Timeout: getTimeoutFunc(ctx, pr, rprt, c.Clock),
PodTemplate: taskRunSpec.TaskPodTemplate,
Expand All @@ -803,7 +834,7 @@ func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.Resolved
}

resources.WrapSteps(&tr.Spec, rprt.PipelineTask, rprt.ResolvedTaskResources.Inputs, rprt.ResolvedTaskResources.Outputs, storageBasePath)
logger.Infof("Creating a new TaskRun object %s for pipeline task %s", rprt.TaskRunName, rprt.PipelineTask.Name)
logger.Infof("Creating a new TaskRun object %s for pipeline task %s", taskRunName, rprt.PipelineTask.Name)
return c.PipelineClientSet.TektonV1beta1().TaskRuns(pr.Namespace).Create(ctx, tr, metav1.CreateOptions{})
}

Expand Down
Loading

0 comments on commit f0a9c38

Please sign in to comment.