Skip to content

Commit

Permalink
Filter invalid Pipeline Results from final results in PipelineRun Status
Browse files Browse the repository at this point in the history
Prior to this commit a Pipeline Result that references a Task Result of a failed TaskRun
would end up with a value of the literal variable. For example: if Task "foo" failed,
a Pipeline Result with value of "$(tasks.foo.results.bar)" would end up in the
PipelineRun.Status.Results list with the literal value, "$(tasks.foo.results.bar)".
It was therefore difficult to assess programatically whether results were populated
correctly or were the result of some invalid TaskRun condition.

This commit fixes the bug by filtering out PipelineRun Results that reference failed
TaskRuns. It was quite difficult to follow the flow of execution wrt PipelineRun Results
and so ultimately I had to refactor the whole lot to figure out where the bug was. The
final code is quite a bit shorter than the original and has improved test coverage to
more robustly exercise the behaviour of PipelineResults in various failure scenarios.
  • Loading branch information
Scott authored and tekton-robot committed Jan 12, 2021
1 parent 3acf1e7 commit 6e6b6c8
Show file tree
Hide file tree
Showing 7 changed files with 373 additions and 235 deletions.
14 changes: 14 additions & 0 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,20 @@ references the `outputValue` `Result` emitted by the `calculate-sum` `Task`.

For an end-to-end example, see [`Results` in a `PipelineRun`](../examples/v1beta1/pipelineruns/pipelinerun-results.yaml).

A `Pipeline Result` is not emitted if any of the following are true:
- A `PipelineTask` referenced by the `Pipeline Result` failed. The `PipelineRun` will also
have failed.
- A `PipelineTask` referenced by the `Pipeline Result` was skipped.
- A `PipelineTask` referenced by the `Pipeline Result` didn't emit the referenced `Task Result`. This
should be considered a bug in the `Task` and [may fail a `PipelineTask` in future](https://github.com/tektoncd/pipeline/issues/3497).
- The `Pipeline Result` uses a variable that doesn't point to an actual `PipelineTask`. This will
result in an `InvalidTaskResultReference` validation error during `PipelineRun` execution.
- The `Pipeline Result` uses a variable that doesn't point to an actual result in a `PipelineTask`.
This will cause an `InvalidTaskResultReference` validation error during `PipelineRun` execution.

**Note:** Since a `Pipeline Result` can contain references to multiple `Task Results`, if any of those
`Task Result` references are invalid the entire `Pipeline Result` is not emitted.

## Configuring the `Task` execution order

You can connect `Tasks` in a `Pipeline` so that they execute in a Directed Acyclic Graph (DAG).
Expand Down
47 changes: 9 additions & 38 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"path/filepath"
"reflect"
"strconv"
"strings"
"time"

"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -176,7 +175,15 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
// and may not have had all of the assumed default specified.
pr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx))

c.updatePipelineResults(ctx, pr, getPipelineFunc)
if _, pipelineSpec, err := resources.GetPipelineData(ctx, pr, getPipelineFunc); err != nil {
msg := fmt.Sprintf("Failed to get Pipeline Spec to process Pipeline Results for PipelineRun %s/%s: %v", pr.Namespace, pr.Name, err)
logger.Error(msg)
logger.Warnf("An error processing Pipeline Results overwrites existing Succeeded Condition for PipelineRun %s/%s: %v", pr.Namespace, pr.Name, pr.Status.GetCondition(apis.ConditionSucceeded))
pr.Status.MarkFailed(ReasonCouldntGetPipeline, msg)
} else {
pr.Status.PipelineResults = resources.ApplyTaskResultsToPipelineResults(pipelineSpec.Results, pr.Status.TaskRuns)
}

if err := artifacts.CleanupArtifactStorage(ctx, pr, c.KubeClientSet); err != nil {
logger.Errorf("Failed to delete PVC for PipelineRun %s: %v", pr.Name, err)
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
Expand Down Expand Up @@ -317,21 +324,6 @@ func (c *Reconciler) resolvePipelineState(
return pst, nil
}

func (c *Reconciler) updatePipelineResults(ctx context.Context, pr *v1beta1.PipelineRun, getPipelineFunc resources.GetPipeline) {
logger := logging.FromContext(ctx)

_, pipelineSpec, err := resources.GetPipelineData(ctx, pr, getPipelineFunc)
if err != nil {
logger.Errorf("Failed to determine Pipeline spec to use for pipelinerun %s: %v", pr.Name, err)
pr.Status.MarkFailed(ReasonCouldntGetPipeline,
"Error retrieving pipeline for pipelinerun %s/%s: %s",
pr.Namespace, pr.Name, err)
return
}
resolvedResultRefs := resources.ResolvePipelineResultRefs(pr.Status, pipelineSpec.Results)
pr.Status.PipelineResults = getPipelineRunResults(pipelineSpec, resolvedResultRefs)
}

func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, getPipelineFunc resources.GetPipeline) error {
logger := logging.FromContext(ctx)
// We may be reading a version of the object that was stored at an older version
Expand Down Expand Up @@ -622,27 +614,6 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
return nil
}

func getPipelineRunResults(pipelineSpec *v1beta1.PipelineSpec, resolvedResultRefs resources.ResolvedResultRefs) []v1beta1.PipelineRunResult {
var results []v1beta1.PipelineRunResult
stringReplacements := map[string]string{}

for _, resolvedResultRef := range resolvedResultRefs {
replaceTarget := fmt.Sprintf("%s.%s.%s.%s", v1beta1.ResultTaskPart, resolvedResultRef.ResultReference.PipelineTask, v1beta1.ResultResultPart, resolvedResultRef.ResultReference.Result)
stringReplacements[replaceTarget] = resolvedResultRef.Value.StringVal
}
for _, result := range pipelineSpec.Results {
in := result.Value
for k, v := range stringReplacements {
in = strings.Replace(in, fmt.Sprintf("$(%s)", k), v, -1)
}
results = append(results, v1beta1.PipelineRunResult{
Name: result.Name,
Value: in,
})
}
return results
}

func (c *Reconciler) updateTaskRunsStatusDirectly(pr *v1beta1.PipelineRun) error {
for taskRunName := range pr.Status.TaskRuns {
// TODO(dibyom): Add conditionCheck statuses here
Expand Down
75 changes: 75 additions & 0 deletions pkg/reconciler/pipelinerun/resources/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package resources

import (
"fmt"
"strings"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
)

// ApplyParameters applies the params from a PipelineRun.Params to a PipelineSpec.
Expand Down Expand Up @@ -84,6 +87,8 @@ func ApplyTaskResults(targets PipelineRunState, resolvedResultRefs ResolvedResul
}
}

// ApplyWorkspaces replaces workspace variables in the given pipeline spec with their
// concrete values.
func ApplyWorkspaces(p *v1beta1.PipelineSpec, pr *v1beta1.PipelineRun) *v1beta1.PipelineSpec {
p = p.DeepCopy()
replacements := map[string]string{}
Expand Down Expand Up @@ -124,3 +129,73 @@ func replaceParamValues(params []v1beta1.Param, stringReplacements map[string]st
}
return params
}

// ApplyTaskResultsToPipelineResults applies the results of completed TasksRuns to a Pipeline's
// list of PipelineResults, returning the computed set of PipelineRunResults. References to
// non-existent TaskResults or failed TaskRuns result in a PipelineResult being considered invalid
// and omitted from the returned slice. A nil slice is returned if no results are passed in or all
// results are invalid.
func ApplyTaskResultsToPipelineResults(results []v1beta1.PipelineResult, taskRunStatuses map[string]*v1beta1.PipelineRunTaskRunStatus) []v1beta1.PipelineRunResult {
taskStatuses := map[string]*v1beta1.PipelineRunTaskRunStatus{}
for _, trStatus := range taskRunStatuses {
taskStatuses[trStatus.PipelineTaskName] = trStatus
}

var runResults []v1beta1.PipelineRunResult = nil
stringReplacements := map[string]string{}
for _, pipelineResult := range results {
variablesInPipelineResult, _ := v1beta1.GetVarSubstitutionExpressionsForPipelineResult(pipelineResult)
validPipelineResult := true
for _, variable := range variablesInPipelineResult {
if _, isMemoized := stringReplacements[variable]; isMemoized {
continue
}
if resultValue := taskResultValue(variable, taskStatuses); resultValue != nil {
stringReplacements[variable] = *resultValue
} else {
validPipelineResult = false
}
}
if validPipelineResult {
finalValue := pipelineResult.Value
for variable, value := range stringReplacements {
v := fmt.Sprintf("$(%s)", variable)
finalValue = strings.Replace(finalValue, v, value, -1)
}
runResults = append(runResults, v1beta1.PipelineRunResult{
Name: pipelineResult.Name,
Value: finalValue,
})
}
}

return runResults
}

// taskResultValue returns a pointer to the result value for a given task result variable. A nil
// pointer is returned if the variable is invalid for any reason.
func taskResultValue(variable string, taskStatuses map[string]*v1beta1.PipelineRunTaskRunStatus) *string {
variableParts := strings.Split(variable, ".")
if len(variableParts) != 4 || variableParts[0] != "tasks" || variableParts[2] != "results" {
return nil
}

taskName, resultName := variableParts[1], variableParts[3]

status, taskExists := taskStatuses[taskName]
if !taskExists || status.Status == nil {
return nil
}

cond := status.Status.GetCondition(apis.ConditionSucceeded)
if cond == nil || cond.Status != corev1.ConditionTrue {
return nil
}

for _, trResult := range status.Status.TaskRunResults {
if trResult.Name == resultName {
return &trResult.Value
}
}
return nil
}
Loading

0 comments on commit 6e6b6c8

Please sign in to comment.