Skip to content

Commit

Permalink
Introduce InternalTektonResultType as a ResultType
Browse files Browse the repository at this point in the history
In light of tektoncd#3087 the need for a ResultType that is not exposed
as a TaskRunResult or PipelineResourceResult arises.
In tektoncd#3087, a Step can emit a result indicating a Step timeout
has occurred.
This is a result that should not be exposed hence  the need for a
new ResultType called InternalTektonResultType.
This commit ensures results of this type are filtered out.

Introducing an InternalTektonResultType ensures a future proof
solution to internal results that should not be exposed.
Aside from the example in tektoncd#3087, a present candidate is the
result written out by a Step containing a "StartedAt" key.
Currently this result is filtered out with a specific function.
Marking it as an InternalTektonResultTypes now allows for
this result to automatically be filtered out.
  • Loading branch information
Peaorl committed Aug 26, 2020
1 parent fb296e6 commit d202ac5
Show file tree
Hide file tree
Showing 6 changed files with 439 additions and 398 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
TaskRunResultType ResultType = "TaskRunResult"
// PipelineResourceResultType default pipeline result value
PipelineResourceResultType ResultType = "PipelineResourceResult"
// InternalTektonResultType default internal tekton result value
InternalTektonResultType ResultType = "InternalTektonResult"
// UnknownResultType default unknown result type value
UnknownResultType ResultType = ""
)
Expand Down
10 changes: 6 additions & 4 deletions pkg/entrypoint/entrypointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func (e Entrypointer) Go() error {
// *but* we write postfile to make next steps bail too.
e.WritePostFile(e.PostFile, err)
output = append(output, v1beta1.PipelineResourceResult{
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
ResultType: v1beta1.InternalTektonResultType,
})

return err
Expand All @@ -114,8 +115,9 @@ func (e Entrypointer) Go() error {
e.Args = append([]string{e.Entrypoint}, e.Args...)
}
output = append(output, v1beta1.PipelineResourceResult{
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
ResultType: v1beta1.InternalTektonResultType,
})

err := e.Runner.Run(e.Args...)
Expand Down
120 changes: 81 additions & 39 deletions pkg/pod/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package pod

import (
"encoding/json"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -97,27 +96,51 @@ func SidecarsReady(podStatus corev1.PodStatus) bool {
}

// MakeTaskRunStatus returns a TaskRunStatus based on the Pod's status.
func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, taskSpec v1beta1.TaskSpec) v1beta1.TaskRunStatus {
func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, taskSpec v1beta1.TaskSpec) (v1beta1.TaskRunStatus, error) {
trs := &tr.Status
if trs.GetCondition(apis.ConditionSucceeded) == nil || trs.GetCondition(apis.ConditionSucceeded).Status == corev1.ConditionUnknown {
// If the taskRunStatus doesn't exist yet, it's because we just started running
MarkStatusRunning(trs, v1beta1.TaskRunReasonRunning.String(), "Not all Steps in the Task have finished executing")
}

// Complete if we did not find a step that is not complete, or the pod is in a definitely complete phase
complete := areStepsComplete(pod) || pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed

if complete {
updateCompletedTaskRun(trs, pod)
} else {
updateIncompleteTaskRun(trs, pod)
}

trs.PodName = pod.Name
trs.Steps = []v1beta1.StepState{}
trs.Sidecars = []v1beta1.SidecarState{}

var err error

for _, s := range pod.Status.ContainerStatuses {
if IsContainerStep(s.Name) {
if s.State.Terminated != nil && len(s.State.Terminated.Message) != 0 {
message, time, err := removeStartInfoFromTerminationMessage(s)

var results []v1beta1.PipelineResourceResult
results, err = termination.ParseMessage(s.State.Terminated.Message)
if err != nil {
logger.Errorf("error setting the start time of step %q in taskrun %q: %w", s.Name, tr.Name, err)
}
if time != nil {
s.State.Terminated.StartedAt = *time
s.State.Terminated.Message = message
logger.Errorf("termination message could not be parsed as JSON: %v", err)
} else {
//Further processing if the termination message is JSON formatted
var time *metav1.Time
time, err = extractStartedAtTimeFromResults(results)
if err != nil {
logger.Errorf("error setting the start time of step %q in taskrun %q: %v", s.Name, tr.Name, err)
}
if time != nil {
s.State.Terminated.StartedAt = *time
}
if tr.IsSuccessful() {
taskResults, pipelineResourceResults := filterResultsAndResources(results)
trs.TaskRunResults = append(trs.TaskRunResults, taskResults...)
trs.ResourcesResult = append(trs.ResourcesResult, pipelineResourceResults...)
}
}
}
trs.Steps = append(trs.Steps, v1beta1.StepState{
Expand All @@ -135,51 +158,70 @@ func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev
})
}
}

// Complete if we did not find a step that is not complete, or the pod is in a definitely complete phase
complete := areStepsComplete(pod) || pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed

if complete {
updateCompletedTaskRun(trs, pod)
} else {
updateIncompleteTaskRun(trs, pod)
}
trs.TaskRunResults = removeDuplicateResults(trs.TaskRunResults)

// Sort step states according to the order specified in the TaskRun spec's steps.
trs.Steps = sortTaskRunStepOrder(trs.Steps, taskSpec.Steps)

return *trs
return *trs, err
}

func filterResultsAndResources(results []v1beta1.PipelineResourceResult) ([]v1beta1.TaskRunResult, []v1beta1.PipelineResourceResult) {
var taskResults []v1beta1.TaskRunResult
var pipelineResourceResults []v1beta1.PipelineResourceResult
for _, r := range results {
switch r.ResultType {
case v1beta1.TaskRunResultType:
taskRunResult := v1beta1.TaskRunResult{
Name: r.Key,
Value: r.Value,
}
taskResults = append(taskResults, taskRunResult)
case v1beta1.InternalTektonResultType:
// Internal messages are ignored because they're not used as external result
continue
case v1beta1.PipelineResourceResultType:
fallthrough
default:
pipelineResourceResults = append(pipelineResourceResults, r)
}
}

return taskResults, pipelineResourceResults
}

// removeStartInfoFromTerminationMessage searches for a result called "StartedAt" in the JSON-formatted
// termination message of a step and returns the values to use for sets State.Terminated if it's
// found. The "StartedAt" result is also removed from the list of results in the container status.
func removeStartInfoFromTerminationMessage(s corev1.ContainerStatus) (string, *metav1.Time, error) {
r, err := termination.ParseMessage(s.State.Terminated.Message)
if err != nil {
return "", nil, fmt.Errorf("termination message could not be parsed as JSON: %w", err)
func removeDuplicateResults(taskRunResult []v1beta1.TaskRunResult) []v1beta1.TaskRunResult {
if len(taskRunResult) == 0 {
return nil
}

uniq := make([]v1beta1.TaskRunResult, 0)
latest := make(map[string]v1beta1.TaskRunResult, 0)
for _, res := range taskRunResult {
if _, seen := latest[res.Name]; !seen {
uniq = append(uniq, res)
}
latest[res.Name] = res
}
for i, res := range uniq {
uniq[i] = latest[res.Name]
}
for index, result := range r {
return uniq
}

func extractStartedAtTimeFromResults(results []v1beta1.PipelineResourceResult) (*metav1.Time, error) {

for _, result := range results {
if result.Key == "StartedAt" {
t, err := time.Parse(timeFormat, result.Value)
if err != nil {
return "", nil, fmt.Errorf("could not parse time value %q in StartedAt field: %w", result.Value, err)
return nil, fmt.Errorf("could not parse time value %q in StartedAt field: %w", result.Value, err)
}
message := ""
startedAt := metav1.NewTime(t)
// remove the entry for the starting time
r = append(r[:index], r[index+1:]...)
if len(r) == 0 {
message = ""
} else if bytes, err := json.Marshal(r); err != nil {
return "", nil, fmt.Errorf("error marshalling remaining results back into termination message: %w", err)
} else {
message = string(bytes)
}
return message, &startedAt, nil
return &startedAt, nil
}
}
return "", nil, nil
return nil, nil
}

func updateCompletedTaskRun(trs *v1beta1.TaskRunStatus, pod *corev1.Pod) {
Expand Down
Loading

0 comments on commit d202ac5

Please sign in to comment.