From 4bbd79767df8b61a6877621295f456208ce2917e Mon Sep 17 00:00:00 2001 From: hriships Date: Sun, 11 Aug 2019 16:21:50 +0530 Subject: [PATCH] Adds pipeline metrics Often, as a developer or administrator(ops) I want some insights about pipeline behavior in terms of time taken to execute pipleinerun/taskrun, its success or failure ratio, pod latencies etc. At present tekton pipelines has very limited ways to surface such information or it's hard to get those details looking at resources yamls. This patch exposes above mentioned pipelines metrics on '/metrics' endpoint using knative `pkg/metrics` package. User can collect such metrics using prometheus, stackdriver or other supported metrics system. To some extent its solves - https://github.com/tektoncd/pipeline/issues/540 - https://github.com/tektoncd/pipeline/issues/164 --- Gopkg.lock | 4 + cmd/controller/main.go | 2 +- pkg/apis/pipeline/v1alpha1/taskrun_types.go | 16 + .../pipeline/v1alpha1/taskrun_types_test.go | 44 ++- pkg/reconciler/pipelinerun/controller.go | 5 + pkg/reconciler/pipelinerun/metrics.go | 181 ++++++++++ pkg/reconciler/pipelinerun/metrics_test.go | 159 +++++++++ pkg/reconciler/pipelinerun/pipelinerun.go | 36 +- pkg/reconciler/stats.go | 31 -- pkg/reconciler/taskrun/controller.go | 5 + pkg/reconciler/taskrun/metrics.go | 295 +++++++++++++++++ pkg/reconciler/taskrun/metrics_test.go | 308 ++++++++++++++++++ pkg/reconciler/taskrun/taskrun.go | 45 ++- test/builder/pod.go | 30 ++ test/builder/task.go | 7 + 15 files changed, 1116 insertions(+), 52 deletions(-) create mode 100644 pkg/reconciler/pipelinerun/metrics.go create mode 100644 pkg/reconciler/pipelinerun/metrics_test.go delete mode 100644 pkg/reconciler/stats.go create mode 100644 pkg/reconciler/taskrun/metrics.go create mode 100644 pkg/reconciler/taskrun/metrics_test.go diff --git a/Gopkg.lock b/Gopkg.lock index b264b39da12..edfea824257 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1346,6 +1346,9 @@ "github.com/mitchellh/go-homedir", "github.com/mohae/deepcopy", "github.com/tektoncd/plumbing/scripts", + "go.opencensus.io/stats", + "go.opencensus.io/stats/view", + "go.opencensus.io/tag", "go.opencensus.io/trace", "go.uber.org/zap", "go.uber.org/zap/zaptest", @@ -1408,6 +1411,7 @@ "knative.dev/pkg/logging", "knative.dev/pkg/logging/logkey", "knative.dev/pkg/logging/testing", + "knative.dev/pkg/metrics", "knative.dev/pkg/reconciler/testing", "knative.dev/pkg/signals", "knative.dev/pkg/test", diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 034ffd0408e..35dbff6e2a2 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -28,7 +28,7 @@ import ( const ( // ControllerLogKey is the name of the logger for the controller cmd - ControllerLogKey = "controller" + ControllerLogKey = "tekton" ) var ( diff --git a/pkg/apis/pipeline/v1alpha1/taskrun_types.go b/pkg/apis/pipeline/v1alpha1/taskrun_types.go index 4b7ea38dda3..8a3fe0c8145 100644 --- a/pkg/apis/pipeline/v1alpha1/taskrun_types.go +++ b/pkg/apis/pipeline/v1alpha1/taskrun_types.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + "github.com/tektoncd/pipeline/pkg/apis/pipeline" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" @@ -290,4 +291,19 @@ func (tr *TaskRun) GetServiceAccountName() string { name = tr.Spec.DeprecatedServiceAccount } return name + +} + +// IsPartOfPipeline return true if TaskRun is a part of a Pipeline. +// It also return the name of Pipeline and PipelineRun +func (tr *TaskRun) IsPartOfPipeline() (bool, string, string) { + if tr == nil || len(tr.Labels) == 0 { + return false, "", "" + } + + if pl, ok := tr.Labels[pipeline.GroupName+pipeline.PipelineLabelKey]; ok { + return true, pl, tr.Labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] + } + + return false, "", "" } diff --git a/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go b/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go index 0ff772a7800..9e1b8a95a24 100644 --- a/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go +++ b/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go @@ -22,12 +22,12 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + tb "github.com/tektoncd/pipeline/test/builder" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" - - tb "github.com/tektoncd/pipeline/test/builder" ) func TestTaskRun_GetBuildPodRef(t *testing.T) { @@ -179,3 +179,43 @@ func TestTaskRunGetServiceAccountName(t *testing.T) { } } } + +func TestTaskRunIsOfPipelinerun(t *testing.T) { + tests := []struct { + name string + tr *v1alpha1.TaskRun + expectedValue bool + expetectedPipeline string + expetectedPipelineRun string + }{{ + name: "yes", + tr: tb.TaskRun("taskrunname", "testns", + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineLabelKey, "pipeline"), + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineRunLabelKey, "pipelinerun"), + ), + expectedValue: true, + expetectedPipeline: "pipeline", + expetectedPipelineRun: "pipelinerun", + }, { + name: "no", + tr: tb.TaskRun("taskrunname", "testns"), + expectedValue: false, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + value, pipeline, pipelineRun := test.tr.IsPartOfPipeline() + if value != test.expectedValue { + t.Fatalf("Expecting %v got %v", test.expectedValue, value) + } + + if pipeline != test.expetectedPipeline { + t.Fatalf("Mismatch in pipeline: got %s expected %s", pipeline, test.expetectedPipeline) + } + + if pipelineRun != test.expetectedPipelineRun { + t.Fatalf("Mismatch in pipelinerun: got %s expected %s", pipelineRun, test.expetectedPipelineRun) + } + }) + } +} diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index c4211d5c163..a39f556deb0 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -56,6 +56,10 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch resourceInformer := resourceinformer.Get(ctx) conditionInformer := conditioninformer.Get(ctx) timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger) + metrics, err := NewRecorder() + if err != nil { + logger.Errorf("Failed to create pipelinerun metrics recorder %v", err) + } opt := reconciler.Options{ KubeClientSet: kubeclientset, @@ -75,6 +79,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch resourceLister: resourceInformer.Lister(), conditionLister: conditionInformer.Lister(), timeoutHandler: timeoutHandler, + metrics: metrics, } impl := controller.NewImpl(c, c.Logger, pipelineRunControllerName) diff --git a/pkg/reconciler/pipelinerun/metrics.go b/pkg/reconciler/pipelinerun/metrics.go new file mode 100644 index 00000000000..6c9ea52c233 --- /dev/null +++ b/pkg/reconciler/pipelinerun/metrics.go @@ -0,0 +1,181 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pipelinerun + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "knative.dev/pkg/metrics" +) + +var ( + prDuration = stats.Float64( + "pipelinerun_duration_seconds", + "The pipelinerun execution time in seconds", + stats.UnitDimensionless) + prDistributions = view.Distribution(10, 30, 60, 300, 900, 1800, 3600, 5400, 10800, 21600, 43200, 86400) + + prCount = stats.Float64("pipelinerun_count", + "number of pipelineruns", + stats.UnitDimensionless) + + runningPRsCount = stats.Float64("running_pipelineruns_count", + "Number of pipelineruns executing currently", + stats.UnitDimensionless) +) + +type Recorder struct { + initialized bool + + pipeline tag.Key + pipelineRun tag.Key + namespace tag.Key + status tag.Key +} + +// NewRecorder creates a new metrics recorder instance +// to log the PipelineRun related metrics +func NewRecorder() (*Recorder, error) { + r := &Recorder{ + initialized: true, + } + + pipeline, err := tag.NewKey("pipeline") + if err != nil { + return nil, err + } + r.pipeline = pipeline + + pipelineRun, err := tag.NewKey("pipelinerun") + if err != nil { + return nil, err + } + r.pipelineRun = pipelineRun + + namespace, err := tag.NewKey("namespace") + if err != nil { + return nil, err + } + r.namespace = namespace + + status, err := tag.NewKey("status") + if err != nil { + return nil, err + } + r.status = status + + err = view.Register( + &view.View{ + Description: prDuration.Description(), + Measure: prDuration, + Aggregation: prDistributions, + TagKeys: []tag.Key{r.pipeline, r.pipelineRun, r.namespace, r.status}, + }, + &view.View{ + Description: prCount.Description(), + Measure: prCount, + Aggregation: view.Count(), + TagKeys: []tag.Key{r.status}, + }, + &view.View{ + Description: runningPRsCount.Description(), + Measure: runningPRsCount, + Aggregation: view.LastValue(), + }, + ) + + if err != nil { + r.initialized = false + return r, err + } + + return r, nil +} + +// DurationAndCount logs the duration of PipelineRun execution and +// count for number of PipelineRuns succeed or failed +// returns an error if its failed to log the metrics +func (r *Recorder) DurationAndCount(pr *v1alpha1.PipelineRun) error { + if !r.initialized { + return fmt.Errorf("ignoring the metrics recording for %s , failed to initialize the metrics recorder", pr.Name) + } + + duration := time.Since(pr.Status.StartTime.Time) + if pr.Status.CompletionTime != nil { + duration = pr.Status.CompletionTime.Sub(pr.Status.StartTime.Time) + } + + status := "success" + if pr.Status.Conditions[0].Status == corev1.ConditionFalse { + status = "failed" + } + + ctx, err := tag.New( + context.Background(), + tag.Insert(r.pipeline, pr.Spec.PipelineRef.Name), + tag.Insert(r.pipelineRun, pr.Name), + tag.Insert(r.namespace, pr.Namespace), + tag.Insert(r.status, status), + ) + + if err != nil { + return err + } + + metrics.Record(ctx, prDuration.M(float64(duration/time.Second))) + metrics.Record(ctx, prCount.M(1)) + + return nil +} + +// RunningPipelineRuns logs the number of PipelineRuns running right now +// returns an error if its failed to log the metrics +func (r *Recorder) RunningPipelineRuns(lister listers.PipelineRunLister) error { + if !r.initialized { + return errors.New("ignoring the metrics recording, failed to initialize the metrics recorder") + } + + prs, err := lister.List(labels.Everything()) + if err != nil { + return fmt.Errorf("failed to list pipelineruns while generating metrics : %v", err) + } + + var runningPRs int + for _, pr := range prs { + if !pr.IsDone() { + runningPRs++ + } + } + + ctx, err := tag.New(context.Background()) + if err != nil { + return err + } + metrics.Record(ctx, runningPRsCount.M(float64(runningPRs))) + + return nil +} diff --git a/pkg/reconciler/pipelinerun/metrics_test.go b/pkg/reconciler/pipelinerun/metrics_test.go new file mode 100644 index 00000000000..73338c061ef --- /dev/null +++ b/pkg/reconciler/pipelinerun/metrics_test.go @@ -0,0 +1,159 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pipelinerun + +import ( + "testing" + "time" + + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + alpha1 "github.com/tektoncd/pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1" + fakepipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/pipelinerun/fake" + tb "github.com/tektoncd/pipeline/test/builder" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/apis" + "knative.dev/pkg/metrics/metricstest" + rtesting "knative.dev/pkg/reconciler/testing" +) + +func TestUninitializedMetrics(t *testing.T) { + metrics := Recorder{} + + durationCountError := metrics.DurationAndCount(&v1alpha1.PipelineRun{}) + prCountError := metrics.RunningPipelineRuns(nil) + + assertErrNotNil(durationCountError, "DurationAndCount recording expected to return error but got nil", t) + assertErrNotNil(prCountError, "Current PR count recording expected to return error but got nil", t) +} + +func TestRecordPipelineRunDurationCount(t *testing.T) { + startTime := time.Now() + + testData := []struct { + name string + taskRun *v1alpha1.PipelineRun + expectedTags map[string]string + expectedDuration float64 + expectedCount int64 + }{{ + name: "for_succeeded_pipeline", + taskRun: tb.PipelineRun("pipelinerun-1", "ns", + tb.PipelineRunSpec("pipeline-1"), + tb.PipelineRunStatus( + tb.PipelineRunStartTime(startTime), + tb.PipelineRunCompletionTime(startTime.Add(1*time.Minute)), + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + )), + expectedTags: map[string]string{ + "pipeline": "pipeline-1", + "pipelinerun": "pipelinerun-1", + "namespace": "ns", + "status": "success", + }, + expectedDuration: 60, + expectedCount: 1, + }, { + name: "for_failed_pipeline", + taskRun: tb.PipelineRun("pipelinerun-1", "ns", + tb.PipelineRunSpec("pipeline-1"), + tb.PipelineRunStatus( + tb.PipelineRunStartTime(startTime), + tb.PipelineRunCompletionTime(startTime.Add(1*time.Minute)), + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }), + )), + expectedTags: map[string]string{ + "pipeline": "pipeline-1", + "pipelinerun": "pipelinerun-1", + "namespace": "ns", + "status": "failed", + }, + expectedDuration: 60, + expectedCount: 1, + }} + + for _, test := range testData { + t.Run(test.name, func(t *testing.T) { + defer unregisterMetrics() + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.DurationAndCount(test.taskRun) + assertErrIsNil(err, "DurationAndCount recording recording got an error", t) + metricstest.CheckDistributionData(t, "pipelinerun_duration_seconds", test.expectedTags, 1, test.expectedDuration, test.expectedDuration) + metricstest.CheckCountData(t, "pipelinerun_count", test.expectedTags, test.expectedCount) + }) + } +} + +func TestRecordRunningPipelineRunsCount(t *testing.T) { + defer unregisterMetrics() + + ctx, _ := rtesting.SetupFakeContext(t) + informer := fakepipelineruninformer.Get(ctx) + addPipelineRun(informer, "pipelinerun-1", "pipeline-1", "ns", corev1.ConditionTrue, t) + addPipelineRun(informer, "pipelinerun-2", "pipeline-2", "ns", corev1.ConditionFalse, t) + addPipelineRun(informer, "pipelinerun-3", "pipeline-3", "ns", corev1.ConditionUnknown, t) + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.RunningPipelineRuns(informer.Lister()) + assertErrIsNil(err, "RunningPrsCount recording expected to return nil but got error", t) + metricstest.CheckLastValueData(t, "running_pipelineruns_count", map[string]string{}, 1) +} + +func addPipelineRun(informer alpha1.PipelineRunInformer, run, pipeline, ns string, status corev1.ConditionStatus, t *testing.T) { + t.Helper() + + err := informer.Informer().GetIndexer().Add(tb.PipelineRun(run, ns, + tb.PipelineRunSpec(pipeline), + tb.PipelineRunStatus( + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: status, + }), + ))) + + if err != nil { + t.Errorf("Failed to add the pipelinerun") + } +} + +func assertErrNotNil(err error, message string, t *testing.T) { + t.Helper() + if err == nil { + t.Errorf(message) + } +} + +func assertErrIsNil(err error, message string, t *testing.T) { + t.Helper() + if err != nil { + t.Errorf(message) + } +} + +func unregisterMetrics() { + metricstest.Unregister("pipelinerun_duration_seconds", "pipelinerun_count", "running_pipelineruns_count") +} diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 59113dcb43f..509d0d84bbc 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -100,6 +100,7 @@ type Reconciler struct { tracker tracker.Interface configStore configStore timeoutHandler *reconciler.TimeoutSet + metrics *Recorder } // Check that our Reconciler implements controller.Reconciler @@ -109,7 +110,6 @@ var _ controller.Reconciler = (*Reconciler)(nil) // converge the two. It then updates the Status block of the Pipeline Run // resource with the current status of the resource. func (c *Reconciler) Reconcile(ctx context.Context, key string) error { - c.Logger.Infof("Reconciling %v", time.Now()) // Convert the namespace/name string into a distinct namespace and name @@ -156,6 +156,12 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Logger.Errorf("Failed to update TaskRun status for PipelineRun %s: %v", pr.Name, err) return err } + go func(metrics *Recorder) { + err := metrics.DurationAndCount(pr) + if err != nil { + c.Logger.Warnf("Failed to log the metrics : %v", err) + } + }(c.metrics) } else { if err := c.tracker.Track(pr.GetTaskRunRef(), pr); err != nil { c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err) @@ -171,16 +177,16 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } } - if equality.Semantic.DeepEqual(original.Status, pr.Status) { - // If we didn't change anything then don't call updateStatus. - // This is important because the copy we loaded from the informer's - // cache may be stale and we don't want to overwrite a prior update - // to status with this stale state. - } else if _, err := c.updateStatus(pr); err != nil { - c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err)) - c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update") - return err + var updated bool + if !equality.Semantic.DeepEqual(original.Status, pr.Status) { + if _, err := c.updateStatus(pr); err != nil { + c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err)) + c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update") + return err + } + updated = true } + // Since we are using the status subresource, it is not possible to update // the status and labels/annotations simultaneously. if !reflect.DeepEqual(original.ObjectMeta.Labels, pr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, pr.ObjectMeta.Annotations) { @@ -189,6 +195,16 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update labels/annotations") return err } + updated = true + } + + if updated { + go func(metrics *Recorder) { + err := metrics.RunningPipelineRuns(c.pipelineRunLister) + if err != nil { + c.Logger.Warnf("Failed to log the metrics : %v", err) + } + }(c.metrics) } return err diff --git a/pkg/reconciler/stats.go b/pkg/reconciler/stats.go deleted file mode 100644 index f158989aada..00000000000 --- a/pkg/reconciler/stats.go +++ /dev/null @@ -1,31 +0,0 @@ -/* -Copyright 2019 The Tekton Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package reconciler - -import ( - "go.uber.org/zap" - "knative.dev/pkg/controller" -) - -// MustNewStatsReporter creates a new instance of StatsReporter. Panics if creation fails. -func MustNewStatsReporter(reconciler string, logger *zap.SugaredLogger) controller.StatsReporter { - stats, err := controller.NewStatsReporter(reconciler) - if err != nil { - logger.Fatal("Failed to initialize the stats reporter.", zap.Error(err)) - } - return stats -} diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index e8f37e8e30a..0bd9308709d 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -54,6 +54,10 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch podInformer := podinformer.Get(ctx) resourceInformer := resourceinformer.Get(ctx) timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger) + metrics, err := NewRecorder() + if err != nil { + logger.Errorf("Failed to create taskrun metrics recorder %v", err) + } opt := reconciler.Options{ KubeClientSet: kubeclientset, @@ -71,6 +75,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch resourceLister: resourceInformer.Lister(), timeoutHandler: timeoutHandler, cloudEventClient: cloudeventclient.Get(ctx), + metrics: metrics, } impl := controller.NewImpl(c, c.Logger, taskRunControllerName) diff --git a/pkg/reconciler/taskrun/metrics.go b/pkg/reconciler/taskrun/metrics.go new file mode 100644 index 00000000000..0ec5f3df626 --- /dev/null +++ b/pkg/reconciler/taskrun/metrics.go @@ -0,0 +1,295 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskrun + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "knative.dev/pkg/metrics" +) + +var ( + trDuration = stats.Float64( + "taskrun_duration_seconds", + "The taskrun's execution time in seconds", + stats.UnitDimensionless) + trDistribution = view.Distribution(10, 30, 60, 300, 900, 1800, 3600, 5400, 10800, 21600, 43200, 86400) + + prTRDuration = stats.Float64( + "pipelinerun_taskrun_duration_seconds", + "The pipelinerun's taskrun execution time in seconds", + stats.UnitDimensionless) + prTRLatencyDistribution = view.Distribution(10, 30, 60, 300, 900, 1800, 3600, 5400, 10800, 21600, 43200, 86400) + + trCount = stats.Float64("taskrun_count", + "number of taskruns", + stats.UnitDimensionless) + + runningTRsCount = stats.Float64("running_taskruns_count", + "Number of taskruns executing currently", + stats.UnitDimensionless) + + podLatency = stats.Float64("taskruns_pod_latency", + "scheduling latency for the taskruns pods", + stats.UnitMilliseconds) +) + +type Recorder struct { + initialized bool + + task tag.Key + taskRun tag.Key + namespace tag.Key + status tag.Key + pipeline tag.Key + pipelineRun tag.Key + pod tag.Key +} + +// NewRecorder creates a new metrics recorder instance +// to log the TaskRun related metrics +func NewRecorder() (*Recorder, error) { + r := &Recorder{ + initialized: true, + } + + task, err := tag.NewKey("task") + if err != nil { + return nil, err + } + r.task = task + + taskRun, err := tag.NewKey("taskrun") + if err != nil { + return nil, err + } + r.taskRun = taskRun + + namespace, err := tag.NewKey("namespace") + if err != nil { + return nil, err + } + r.namespace = namespace + + status, err := tag.NewKey("status") + if err != nil { + return nil, err + } + r.status = status + + pipeline, err := tag.NewKey("pipeline") + if err != nil { + return nil, err + } + r.pipeline = pipeline + + pipelineRun, err := tag.NewKey("pipelinerun") + if err != nil { + return nil, err + } + r.pipelineRun = pipelineRun + + pod, err := tag.NewKey("pod") + if err != nil { + return nil, err + } + r.pod = pod + + err = view.Register( + &view.View{ + Description: trDuration.Description(), + Measure: trDuration, + Aggregation: trDistribution, + TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status}, + }, + &view.View{ + Description: prTRDuration.Description(), + Measure: prTRDuration, + Aggregation: prTRLatencyDistribution, + TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun}, + }, + &view.View{ + Description: trCount.Description(), + Measure: trCount, + Aggregation: view.Count(), + TagKeys: []tag.Key{r.status}, + }, + &view.View{ + Description: runningTRsCount.Description(), + Measure: runningTRsCount, + Aggregation: view.LastValue(), + }, + &view.View{ + Description: podLatency.Description(), + Measure: podLatency, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.pod}, + }, + ) + + if err != nil { + r.initialized = false + return r, err + } + + return r, nil +} + +// DurationAndCount logs the duration of TaskRun execution and +// count for number of TaskRuns succeed or failed +// returns an error if its failed to log the metrics +func (r *Recorder) DurationAndCount(tr *v1alpha1.TaskRun) error { + if !r.initialized { + return fmt.Errorf("ignoring the metrics recording for %s , failed to initialize the metrics recorder", tr.Name) + } + + duration := time.Since(tr.Status.StartTime.Time) + if tr.Status.CompletionTime != nil { + duration = tr.Status.CompletionTime.Sub(tr.Status.StartTime.Time) + } + + taskName := "anonymous" + if tr.Spec.TaskRef != nil { + taskName = tr.Spec.TaskRef.Name + } + + status := "success" + if tr.Status.Conditions[0].Status == corev1.ConditionFalse { + status = "failed" + } + + if ok, pipeline, pipelinerun := tr.IsPartOfPipeline(); ok { + ctx, err := tag.New( + context.Background(), + tag.Insert(r.task, taskName), + tag.Insert(r.taskRun, tr.Name), + tag.Insert(r.namespace, tr.Namespace), + tag.Insert(r.status, status), + tag.Insert(r.pipeline, pipeline), + tag.Insert(r.pipelineRun, pipelinerun), + ) + + if err != nil { + return err + } + + stats.Record(ctx, prTRDuration.M(float64(duration/time.Second))) + metrics.Record(ctx, trCount.M(1)) + return nil + } + + ctx, err := tag.New( + context.Background(), + tag.Insert(r.task, taskName), + tag.Insert(r.taskRun, tr.Name), + tag.Insert(r.namespace, tr.Namespace), + tag.Insert(r.status, status), + ) + if err != nil { + return err + } + + metrics.Record(ctx, trDuration.M(float64(duration/time.Second))) + metrics.Record(ctx, trCount.M(1)) + + return nil +} + +// RunningTaskRuns logs the number of TaskRuns running right now +// returns an error if its failed to log the metrics +func (r *Recorder) RunningTaskRuns(lister listers.TaskRunLister) error { + if !r.initialized { + return errors.New("ignoring the metrics recording, failed to initialize the metrics recorder") + } + + trs, err := lister.List(labels.Everything()) + if err != nil { + return err + } + + var runningTrs int + for _, pr := range trs { + if !pr.IsDone() { + runningTrs++ + } + } + + ctx, err := tag.New( + context.Background(), + ) + if err != nil { + return err + } + metrics.Record(ctx, runningTRsCount.M(float64(runningTrs))) + + return nil +} + +// RecordPodLatency logs the duration required to schedule the pod for TaskRun +// returns an error if its failed to log the metrics +func (r *Recorder) RecordPodLatency(pod *corev1.Pod, tr *v1alpha1.TaskRun) error { + if !r.initialized { + return errors.New("ignoring the metrics recording for pod , failed to initialize the metrics recorder") + } + + scheduledTime := getScheduledTime(pod) + if scheduledTime.IsZero() { + return errors.New("pod has never got scheduled") + } + + latency := scheduledTime.Sub(pod.CreationTimestamp.Time) + taskName := "anonymous" + if tr.Spec.TaskRef != nil { + taskName = tr.Spec.TaskRef.Name + } + + ctx, err := tag.New( + context.Background(), + tag.Insert(r.task, taskName), + tag.Insert(r.taskRun, tr.Name), + tag.Insert(r.namespace, tr.Namespace), + tag.Insert(r.pod, pod.Name), + ) + if err != nil { + return err + } + + metrics.Record(ctx, podLatency.M(float64(latency))) + + return nil +} + +func getScheduledTime(pod *corev1.Pod) metav1.Time { + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodScheduled { + return c.LastTransitionTime + } + } + + return metav1.Time{} +} diff --git a/pkg/reconciler/taskrun/metrics_test.go b/pkg/reconciler/taskrun/metrics_test.go new file mode 100644 index 00000000000..de4e9e53777 --- /dev/null +++ b/pkg/reconciler/taskrun/metrics_test.go @@ -0,0 +1,308 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package taskrun + +import ( + "testing" + "time" + + "github.com/tektoncd/pipeline/pkg/apis/pipeline" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + alpha1 "github.com/tektoncd/pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1" + faketaskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/taskrun/fake" + tb "github.com/tektoncd/pipeline/test/builder" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/apis" + "knative.dev/pkg/metrics/metricstest" + rtesting "knative.dev/pkg/reconciler/testing" +) + +func TestUninitializedMetrics(t *testing.T) { + metrics := Recorder{} + + durationCountError := metrics.DurationAndCount(&v1alpha1.TaskRun{}) + taskrunsCountError := metrics.RunningTaskRuns(nil) + podLatencyError := metrics.RecordPodLatency(nil, nil) + + assertErrNotNil(durationCountError, "DurationCount recording expected to return error but got nil", t) + assertErrNotNil(taskrunsCountError, "Current TaskrunsCount recording expected to return error but got nil", t) + assertErrNotNil(podLatencyError, "Pod Latency recording expected to return error but got nil", t) +} + +func TestRecordTaskrunDurationCount(t *testing.T) { + startTime := time.Now() + + testData := []struct { + name string + taskRun *v1alpha1.TaskRun + expectedTags map[string]string + expectedDuration float64 + expectedCount int64 + }{{ + name: "for_succeeded_task", + taskRun: tb.TaskRun("taskrun-1", "ns", + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + tb.TaskRunStatus( + tb.TaskRunStartTime(startTime), + tb.TaskRunCompletionTime(startTime.Add(1*time.Minute)), + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + )), + expectedTags: map[string]string{ + "task": "task-1", + "taskrun": "taskrun-1", + "namespace": "ns", + "status": "success", + }, + expectedDuration: 60, + expectedCount: 1, + }, { + name: "for_failed_task", + taskRun: tb.TaskRun("taskrun-1", "ns", + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + tb.TaskRunStatus( + tb.TaskRunStartTime(startTime), + tb.TaskRunCompletionTime(startTime.Add(1*time.Minute)), + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }), + )), + expectedTags: map[string]string{ + "task": "task-1", + "taskrun": "taskrun-1", + "namespace": "ns", + "status": "failed", + }, + expectedDuration: 60, + expectedCount: 1, + }} + + for _, test := range testData { + t.Run(test.name, func(t *testing.T) { + defer unregisterMetrics() + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.DurationAndCount(test.taskRun) + assertErrIsNil(err, "DurationAndCount recording got an error", t) + metricstest.CheckDistributionData(t, "taskrun_duration_seconds", test.expectedTags, 1, test.expectedDuration, test.expectedDuration) + metricstest.CheckCountData(t, "taskrun_count", test.expectedTags, test.expectedCount) + }) + } +} + +func TestRecordPipelinerunTaskrunDurationCount(t *testing.T) { + startTime := time.Now() + + testData := []struct { + name string + taskRun *v1alpha1.TaskRun + expectedTags map[string]string + expectedDuration float64 + expectedCount int64 + }{{ + name: "for_succeeded_task", + taskRun: tb.TaskRun("taskrun-1", "ns", + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineLabelKey, "pipeline-1"), + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineRunLabelKey, "pipelinerun-1"), + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + tb.TaskRunStatus( + tb.TaskRunStartTime(startTime), + tb.TaskRunCompletionTime(startTime.Add(1*time.Minute)), + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + )), + expectedTags: map[string]string{ + "pipeline": "pipeline-1", + "pipelinerun": "pipelinerun-1", + "task": "task-1", + "taskrun": "taskrun-1", + "namespace": "ns", + "status": "success", + }, + expectedDuration: 60, + expectedCount: 1, + }, { + name: "for_failed_task", + taskRun: tb.TaskRun("taskrun-1", "ns", + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineLabelKey, "pipeline-1"), + tb.TaskRunLabel(pipeline.GroupName+pipeline.PipelineRunLabelKey, "pipelinerun-1"), + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + tb.TaskRunStatus( + tb.TaskRunStartTime(startTime), + tb.TaskRunCompletionTime(startTime.Add(1*time.Minute)), + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }), + )), + expectedTags: map[string]string{ + "pipeline": "pipeline-1", + "pipelinerun": "pipelinerun-1", + "task": "task-1", + "taskrun": "taskrun-1", + "namespace": "ns", + "status": "failed", + }, + expectedDuration: 60, + expectedCount: 1, + }} + + for _, test := range testData { + t.Run(test.name, func(t *testing.T) { + defer unregisterMetrics() + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.DurationAndCount(test.taskRun) + assertErrIsNil(err, "DurationAndCount recording got an error", t) + metricstest.CheckDistributionData(t, "pipelinerun_taskrun_duration_seconds", test.expectedTags, 1, test.expectedDuration, test.expectedDuration) + metricstest.CheckCountData(t, "taskrun_count", test.expectedTags, test.expectedCount) + }) + } +} + +func TestRecordRunningTaskrunsCount(t *testing.T) { + defer unregisterMetrics() + + ctx, _ := rtesting.SetupFakeContext(t) + informer := faketaskruninformer.Get(ctx) + addTaskruns(informer, "taskrun-1", "task-1", "ns", corev1.ConditionTrue, t) + addTaskruns(informer, "taskrun-2", "task-3", "ns", corev1.ConditionUnknown, t) + addTaskruns(informer, "taskrun-3", "task-3", "ns", corev1.ConditionFalse, t) + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.RunningTaskRuns(informer.Lister()) + assertErrIsNil(err, "RunningTaskRuns recording expected to return nil but got error", t) + metricstest.CheckLastValueData(t, "running_taskruns_count", map[string]string{}, 1) +} + +func TestRecordPodLatency(t *testing.T) { + creationTime := time.Now() + testData := []struct { + name string + pod *corev1.Pod + taskRun *v1alpha1.TaskRun + expectedTags map[string]string + expectedValue float64 + expectingError bool + }{{ + name: "for_scheduled_pod", + pod: tb.Pod("test-taskrun-pod-123456", "foo", + tb.PodCreationTimestamp(creationTime), + tb.PodStatus( + tb.PodStatusConditions(corev1.PodCondition{ + Type: corev1.PodScheduled, + LastTransitionTime: metav1.Time{Time: creationTime.Add(4 * time.Second)}, + }), + )), + taskRun: tb.TaskRun("test-taskrun", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + ), + expectedTags: map[string]string{ + "pod": "test-taskrun-pod-123456", + "task": "task-1", + "taskrun": "test-taskrun", + "namespace": "foo", + }, + expectedValue: 4e+09, + }, { + name: "for_non_scheduled_pod", + pod: tb.Pod("test-taskrun-pod-123456", "foo", + tb.PodCreationTimestamp(creationTime), + ), + taskRun: tb.TaskRun("test-taskrun", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef("task-1"), + ), + ), + expectingError: true, + }} + + for _, td := range testData { + t.Run(td.name, func(t *testing.T) { + defer unregisterMetrics() + + metrics, err := NewRecorder() + assertErrIsNil(err, "Recorder initialization failed", t) + + err = metrics.RecordPodLatency(td.pod, td.taskRun) + if td.expectingError { + assertErrNotNil(err, "Pod Latency recording expected to return error but got nil", t) + return + } + assertErrIsNil(err, "RecordPodLatency recording expected to return nil but got error", t) + metricstest.CheckLastValueData(t, "taskruns_pod_latency", td.expectedTags, td.expectedValue) + }) + } + +} + +func addTaskruns(informer alpha1.TaskRunInformer, taskrun, task, ns string, status corev1.ConditionStatus, t *testing.T) { + err := informer.Informer().GetIndexer().Add(tb.TaskRun(taskrun, ns, + tb.TaskRunSpec( + tb.TaskRunTaskRef(task), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: status, + }), + ))) + + if err != nil { + t.Error("Failed to add the taskrun") + } +} + +func assertErrIsNil(err error, message string, t *testing.T) { + t.Helper() + if err != nil { + t.Errorf(message) + } +} + +func assertErrNotNil(err error, message string, t *testing.T) { + t.Helper() + if err == nil { + t.Errorf(message) + } +} + +func unregisterMetrics() { + metricstest.Unregister("taskrun_duration_seconds", "pipelinerun_taskrun_duration_seconds", "taskrun_count", "running_taskruns_count", "taskruns_pod_latency") +} diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 2d83d041364..9b0337b788b 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -71,6 +71,7 @@ type Reconciler struct { tracker tracker.Interface cache *entrypoint.Cache timeoutHandler *reconciler.TimeoutSet + metrics *Recorder } // Check that our Reconciler implements controller.Reconciler @@ -94,7 +95,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Logger.Infof("task run %q in work queue no longer exists", key) return nil } else if err != nil { - c.Logger.Errorf("Error retreiving TaskRun %q: %s", name, err) + c.Logger.Errorf("Error retrieving TaskRun %q: %s", name, err) return err } @@ -111,6 +112,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } if tr.IsDone() { + c.Logger.Infof("taskrun done : %s \n", tr.Name) var merr *multierror.Error // Try to send cloud events first cloudEventErr := cloudevent.SendCloudEvents(tr, c.cloudEventClient, c.Logger) @@ -134,6 +136,18 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { c.Logger.Errorf("Error stopping sidecars for TaskRun %q: %v", name, err) merr = multierror.Append(merr, err) } + + go func(metrics *Recorder) { + err := metrics.DurationAndCount(tr) + if err != nil { + c.Logger.Warnf("Failed to log the metrics : %v", err) + } + err = metrics.RecordPodLatency(pod, tr) + if err != nil { + c.Logger.Warnf("Failed to log the metrics : %v", err) + } + }(c.metrics) + return merr.ErrorOrNil() } // Reconcile this copy of the task run and then write back any status @@ -146,25 +160,40 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1alpha1.TaskRun) error { - var err error - if equality.Semantic.DeepEqual(original.Status, tr.Status) { + var updated bool + + if !equality.Semantic.DeepEqual(original.Status, tr.Status) { // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the informer's // cache may be stale and we don't want to overwrite a prior update // to status with this stale state. - } else if _, err := c.updateStatus(tr); err != nil { - c.Logger.Warn("Failed to update taskRun status", zap.Error(err)) - return err + if _, err := c.updateStatus(tr); err != nil { + c.Logger.Warn("Failed to update taskRun status", zap.Error(err)) + return err + } + updated = true } + // Since we are using the status subresource, it is not possible to update // the status and labels/annotations simultaneously. - if !reflect.DeepEqual(original.ObjectMeta.Labels, tr.ObjectMeta.Labels) { + if !reflect.DeepEqual(original.ObjectMeta.Labels, tr.ObjectMeta.Labels) || !reflect.DeepEqual(original.ObjectMeta.Annotations, tr.ObjectMeta.Annotations) { if _, err := c.updateLabelsAndAnnotations(tr); err != nil { c.Logger.Warn("Failed to update TaskRun labels/annotations", zap.Error(err)) return err } + updated = true + } + + if updated { + go func(metrics *Recorder) { + err := metrics.RunningTaskRuns(c.taskRunLister) + if err != nil { + c.Logger.Warnf("Failed to log the metrics : %v", err) + } + }(c.metrics) } - return err + + return nil } func (c *Reconciler) getTaskFunc(tr *v1alpha1.TaskRun) (resources.GetTask, v1alpha1.TaskKind) { diff --git a/test/builder/pod.go b/test/builder/pod.go index 8c56c96aeb5..7123ad80d7e 100644 --- a/test/builder/pod.go +++ b/test/builder/pod.go @@ -17,6 +17,8 @@ limitations under the License. package builder import ( + "time" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -27,6 +29,9 @@ type PodOp func(*corev1.Pod) // PodSpecOp is an operation which modifies a PodSpec struct. type PodSpecOp func(*corev1.PodSpec) +// PodStatusOp is an operation which modifies a PodStatus struct. +type PodStatusOp func(status *corev1.PodStatus) + // Pod creates a Pod with default values. // Any number of Pod modifiers can be passed to transform it. func Pod(name, namespace string, ops ...PodOp) *corev1.Pod { @@ -142,3 +147,28 @@ func PodVolumes(volumes ...corev1.Volume) PodSpecOp { spec.Volumes = volumes } } + +// PodCreationTimestamp sets the creation time of the pod +func PodCreationTimestamp(t time.Time) PodOp { + return func(p *corev1.Pod) { + p.CreationTimestamp = metav1.Time{Time: t} + } +} + +// PodStatus creates a PodStatus with default values. +// Any number of PodStatus modifiers can be passed to transform it. +func PodStatus(ops ...PodStatusOp) PodOp { + return func(pod *corev1.Pod) { + podStatus := &pod.Status + for _, op := range ops { + op(podStatus) + } + pod.Status = *podStatus + } +} + +func PodStatusConditions(cond corev1.PodCondition) PodStatusOp { + return func(status *corev1.PodStatus) { + status.Conditions = append(status.Conditions, cond) + } +} diff --git a/test/builder/task.go b/test/builder/task.go index 780690f7052..e133b677fdb 100644 --- a/test/builder/task.go +++ b/test/builder/task.go @@ -335,6 +335,13 @@ func TaskRunStartTime(startTime time.Time) TaskRunStatusOp { } } +// TaskRunCompletionTime sets the start time to the TaskRunStatus. +func TaskRunCompletionTime(completionTime time.Time) TaskRunStatusOp { + return func(s *v1alpha1.TaskRunStatus) { + s.CompletionTime = &metav1.Time{Time: completionTime} + } +} + // TaskRunCloudEvent adds an event to the TaskRunStatus. func TaskRunCloudEvent(target, error string, retryCount int32, condition v1alpha1.CloudEventCondition) TaskRunStatusOp { return func(s *v1alpha1.TaskRunStatus) {