Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEP-0090: Fan Out TaskRuns #4990

Merged
merged 1 commit into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions docs/matrix.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ weight: 11
- [Results](#results)
- [Specifying Results in a Matrix](#specifying-results-in-a-matrix)
- [Results from fanned out PipelineTasks](#results-from-fanned-out-pipelinetasks)
- [Fan Out](#fan-out)
- [`PipelineTasks` with `Tasks`](#pipelinetasks-with-tasks)

## Overview

Expand Down Expand Up @@ -134,3 +136,73 @@ Consuming `Results` from previous `TaskRuns` or `Runs` in a `Matrix`, which woul
Consuming `Results` from fanned out `PipelineTasks` will not be in the supported in the initial iteration
of `Matrix`. Supporting consuming `Results` from fanned out `PipelineTasks` will be revisited after array
and object `Results` are supported.

## Fan Out

### `PipelineTasks` with `Tasks`

When a `PipelineTask` has a `Task` and a `Matrix`, the `Task` will be executed in parallel `TaskRuns` with
substitutions from combinations of `Parameters`.

In the example below, nine `TaskRuns` are created with combinations of platforms ("linux", "mac", "windows")
and browsers ("chrome", "safari", "firefox").

```yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: platform-browsers
annotations:
description: |
A task that does something cool with platforms and browsers
spec:
params:
- name: platform
- name: browser
steps:
- name: echo
image: alpine
script: |
echo "$(params.platform) and $(params.browser)"
---
# run platform-browsers task with:
# platforms: linux, mac, windows
# browsers: chrome, safari, firefox
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
generateName: matrixed-pr-
spec:
serviceAccountName: 'default'
pipelineSpec:
tasks:
- name: platforms-and-browsers
matrix:
- name: platform
value:
- linux
- mac
- windows
- name: browser
value:
- chrome
- safari
- firefox
taskRef:
name: platform-browsers
```

```shell
$ tkn taskruns list

NAME STARTED DURATION STATUS
matrixed-pr-6lvzk-platforms-and-browsers-8 11 seconds ago 7 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-6 12 seconds ago 7 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-7 12 seconds ago 9 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-4 12 seconds ago 7 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-5 12 seconds ago 6 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-3 13 seconds ago 7 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-1 13 seconds ago 8 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-2 13 seconds ago 8 seconds Succeeded
matrixed-pr-6lvzk-platforms-and-browsers-0 13 seconds ago 8 seconds Succeeded
```
18 changes: 10 additions & 8 deletions docs/pipelineruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -1012,17 +1012,19 @@ Task Runs:

The name of the `TaskRuns` and `Runs` owned by a `PipelineRun` are univocally associated to the owning resource.
If a `PipelineRun` resource is deleted and created with the same name, the child `TaskRuns` will be created with the
same name as before. The base format of the name is `<pipelinerun-name>-<pipelinetask-name>`. The name may vary
according the logic of [`kmeta.ChildName`](https://pkg.go.dev/github.com/knative/pkg/kmeta#ChildName).
same name as before. The base format of the name is `<pipelinerun-name>-<pipelinetask-name>`. If the `PipelineTask`
has a `Matrix`, the name will have an int suffix with format `<pipelinerun-name>-<pipelinetask-name>-<combination-id>`.
The name may vary according the logic of [`kmeta.ChildName`](https://pkg.go.dev/github.com/knative/pkg/kmeta#ChildName).

Some examples:

| `PipelineRun` Name | `PipelineTask` Name | `TaskRun` Name |
|--------------------------|------------------------------|--------------------|
| pipeline-run | task1 | pipeline-run-task1 |
| pipeline-run | task2-0123456789-0123456789-0123456789-0123456789-0123456789 | pipeline-runee4a397d6eab67777d4e6f9991cd19e6-task2-0123456789-0 |
| pipeline-run-0123456789-0123456789-0123456789-0123456789 | task3 | pipeline-run-0123456789-0123456789-0123456789-0123456789-task3 |
| pipeline-run-0123456789-0123456789-0123456789-0123456789 | task2-0123456789-0123456789-0123456789-0123456789-0123456789 | pipeline-run-0123456789-012345607ad8c7aac5873cdfabe472a68996b5c |
| `PipelineRun` Name | `PipelineTask` Name | `TaskRun` Names |
|----------------------------------------------------------|--------------------------------------------------------------|----------------------------------------------------------------------------------------|
| pipeline-run | task1 | pipeline-run-task1 |
| pipeline-run | task2-0123456789-0123456789-0123456789-0123456789-0123456789 | pipeline-runee4a397d6eab67777d4e6f9991cd19e6-task2-0123456789-0 |
| pipeline-run-0123456789-0123456789-0123456789-0123456789 | task3 | pipeline-run-0123456789-0123456789-0123456789-0123456789-task3 |
| pipeline-run-0123456789-0123456789-0123456789-0123456789 | task2-0123456789-0123456789-0123456789-0123456789-0123456789 | pipeline-run-0123456789-012345607ad8c7aac5873cdfabe472a68996b5c |
| pipeline-run | task4 (with 2x2 `Matrix`) | pipeline-run-task1-0, pipeline-run-task1-2, pipeline-run-task1-3, pipeline-run-task1-4 |

## Cancelling a `PipelineRun`

Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,15 +310,16 @@ func (pt *PipelineTask) validateMatrix(ctx context.Context) (errs *apis.FieldErr
}

func (pt *PipelineTask) validateMatrixCombinationsCount(ctx context.Context) (errs *apis.FieldError) {
matrixCombinationsCount := pt.getMatrixCombinationsCount()
matrixCombinationsCount := pt.GetMatrixCombinationsCount()
maxMatrixCombinationsCount := config.FromContextOrDefaults(ctx).Defaults.DefaultMaxMatrixCombinationsCount
if matrixCombinationsCount > maxMatrixCombinationsCount {
errs = errs.Also(apis.ErrOutOfBoundsValue(matrixCombinationsCount, 0, maxMatrixCombinationsCount, "matrix"))
}
return errs
}

func (pt *PipelineTask) getMatrixCombinationsCount() int {
// GetMatrixCombinationsCount returns the count of combinations of Parameters generated from the Matrix in PipelineTask.
func (pt *PipelineTask) GetMatrixCombinationsCount() int {
if len(pt.Matrix) == 0 {
return 0
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/pipeline/v1beta1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ func TestPipelineTask_validateMatrix(t *testing.T) {
}
}

func TestPipelineTask_getMatrixCombinationsCount(t *testing.T) {
func TestPipelineTask_GetMatrixCombinationsCount(t *testing.T) {
tests := []struct {
name string
pt *PipelineTask
Expand Down Expand Up @@ -860,8 +860,8 @@ func TestPipelineTask_getMatrixCombinationsCount(t *testing.T) {
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if d := cmp.Diff(tt.matrixCombinationsCount, tt.pt.getMatrixCombinationsCount()); d != "" {
t.Errorf("PipelineTask.getMatrixCombinationsCount() errors diff %s", diff.PrintWantGot(d))
if d := cmp.Diff(tt.matrixCombinationsCount, tt.pt.GetMatrixCombinationsCount()); d != "" {
t.Errorf("PipelineTask.GetMatrixCombinationsCount() errors diff %s", diff.PrintWantGot(d))
}
})
}
Expand Down
49 changes: 40 additions & 9 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"path/filepath"
"reflect"
"strconv"
"strings"
"time"

Expand All @@ -38,6 +39,7 @@ import (
listersv1alpha1 "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1"
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1"
resourcelisters "github.com/tektoncd/pipeline/pkg/client/resource/listers/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/matrix"
"github.com/tektoncd/pipeline/pkg/pipelinerunmetrics"
tknreconciler "github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
Expand Down Expand Up @@ -688,7 +690,8 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
if rprt == nil || rprt.Skip(pipelineRunFacts).IsSkipped || rprt.IsFinallySkipped(pipelineRunFacts).IsSkipped {
continue
}
if rprt.IsCustomTask() {
switch {
case rprt.IsCustomTask():
if rprt.IsFinalTask(pipelineRunFacts) {
rprt.Run, err = c.createRun(ctx, rprt, pr, getFinallyTaskRunTimeout)
} else {
Expand All @@ -698,16 +701,27 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
recorder.Eventf(pr, corev1.EventTypeWarning, "RunCreationFailed", "Failed to create Run %q: %v", rprt.RunName, err)
return fmt.Errorf("error creating Run called %s for PipelineTask %s from PipelineRun %s: %w", rprt.RunName, rprt.PipelineTask.Name, pr.Name, err)
}
} else {
case rprt.IsMatrixed():
if rprt.IsFinalTask(pipelineRunFacts) {
rprt.TaskRuns, err = c.createTaskRuns(ctx, rprt, pr, as.StorageBasePath(pr), getFinallyTaskRunTimeout)
} else {
rprt.TaskRuns, err = c.createTaskRuns(ctx, rprt, pr, as.StorageBasePath(pr), getTaskRunTimeout)
}
if err != nil {
recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunsCreationFailed", "Failed to create TaskRuns %q: %v", rprt.TaskRunNames, err)
return fmt.Errorf("error creating TaskRuns called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunNames, rprt.PipelineTask.Name, pr.Name, err)
}
default:
if rprt.IsFinalTask(pipelineRunFacts) {
rprt.TaskRun, err = c.createTaskRun(ctx, rprt, pr, as.StorageBasePath(pr), getFinallyTaskRunTimeout)
rprt.TaskRun, err = c.createTaskRun(ctx, rprt.TaskRunName, nil, rprt, pr, as.StorageBasePath(pr), getFinallyTaskRunTimeout)
} else {
rprt.TaskRun, err = c.createTaskRun(ctx, rprt, pr, as.StorageBasePath(pr), getTaskRunTimeout)
rprt.TaskRun, err = c.createTaskRun(ctx, rprt.TaskRunName, nil, rprt, pr, as.StorageBasePath(pr), getTaskRunTimeout)
}
if err != nil {
recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
}

}
}
return nil
Expand Down Expand Up @@ -750,10 +764,24 @@ func (c *Reconciler) updateRunsStatusDirectly(pr *v1beta1.PipelineRun) error {

type getTimeoutFunc func(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask, c clock.PassiveClock) *metav1.Duration

func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string, getTimeoutFunc getTimeoutFunc) (*v1beta1.TaskRun, error) {
func (c *Reconciler) createTaskRuns(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string, getTimeoutFunc getTimeoutFunc) ([]*v1beta1.TaskRun, error) {
var taskRuns []*v1beta1.TaskRun
matrixCombinations := matrix.FanOut(rprt.PipelineTask.Matrix).ToMap()
for i, taskRunName := range rprt.TaskRunNames {
params := matrixCombinations[strconv.Itoa(i)]
taskRun, err := c.createTaskRun(ctx, taskRunName, params, rprt, pr, storageBasePath, getTimeoutFunc)
jerop marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
taskRuns = append(taskRuns, taskRun)
}
return taskRuns, nil
}

func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, params []v1beta1.Param, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string, getTimeoutFunc getTimeoutFunc) (*v1beta1.TaskRun, error) {
logger := logging.FromContext(ctx)

tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(rprt.TaskRunName)
tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(taskRunName)
if tr != nil {
// Don't modify the lister cache's copy.
tr = tr.DeepCopy()
Expand All @@ -767,16 +795,19 @@ func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.Resolved

rprt.PipelineTask = resources.ApplyPipelineTaskContexts(rprt.PipelineTask)
taskRunSpec := pr.GetTaskRunSpec(rprt.PipelineTask.Name)
if len(params) == 0 {
params = rprt.PipelineTask.Params
}
tr = &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: rprt.TaskRunName,
Name: taskRunName,
Namespace: pr.Namespace,
OwnerReferences: []metav1.OwnerReference{*kmeta.NewControllerRef(pr)},
Labels: combineTaskRunAndTaskSpecLabels(pr, rprt.PipelineTask),
Annotations: combineTaskRunAndTaskSpecAnnotations(pr, rprt.PipelineTask),
},
Spec: v1beta1.TaskRunSpec{
Params: rprt.PipelineTask.Params,
Params: params,
ServiceAccountName: taskRunSpec.TaskServiceAccountName,
Timeout: getTimeoutFunc(ctx, pr, rprt, c.Clock),
PodTemplate: taskRunSpec.TaskPodTemplate,
Expand All @@ -803,7 +834,7 @@ func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.Resolved
}

resources.WrapSteps(&tr.Spec, rprt.PipelineTask, rprt.ResolvedTaskResources.Inputs, rprt.ResolvedTaskResources.Outputs, storageBasePath)
logger.Infof("Creating a new TaskRun object %s for pipeline task %s", rprt.TaskRunName, rprt.PipelineTask.Name)
logger.Infof("Creating a new TaskRun object %s for pipeline task %s", taskRunName, rprt.PipelineTask.Name)
return c.PipelineClientSet.TektonV1beta1().TaskRuns(pr.Namespace).Create(ctx, tr, metav1.CreateOptions{})
}

Expand Down
Loading