From 0b23e042656813faae135a8b7093ea6f263b105d Mon Sep 17 00:00:00 2001 From: Pradeep Kumar Date: Tue, 13 Aug 2019 15:28:49 +0530 Subject: [PATCH] Wait for logs if pipeline is running pipeline logs command waits for the logs if pipeline is running prints a message if there are no logs to show Fixes : #117 Signed-off-by: Pradeep Kumar --- go.sum | 2 - pkg/cmd/pipelinerun/log_reader.go | 64 +++++++- pkg/cmd/pipelinerun/log_test.go | 264 +++++++++++++++++++++++++++++- pkg/cmd/pipelinerun/logs.go | 1 + vendor/modules.txt | 2 +- 5 files changed, 328 insertions(+), 5 deletions(-) diff --git a/go.sum b/go.sum index 1295fa96c..e38a1d934 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,6 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= -github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s= -github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= diff --git a/pkg/cmd/pipelinerun/log_reader.go b/pkg/cmd/pipelinerun/log_reader.go index 6f39fb96e..eb42e3db6 100644 --- a/pkg/cmd/pipelinerun/log_reader.go +++ b/pkg/cmd/pipelinerun/log_reader.go @@ -18,6 +18,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/tektoncd/cli/pkg/cli" "github.com/tektoncd/cli/pkg/cmd/taskrun" @@ -25,7 +26,9 @@ import ( "github.com/tektoncd/cli/pkg/helper/pods/stream" trh "github.com/tektoncd/cli/pkg/helper/taskrun" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" ) type LogReader struct { @@ -33,6 +36,7 @@ type LogReader struct { Ns string Clients *cli.Clients Streamer stream.NewStreamerFunc + Stream *cli.Stream AllSteps bool Follow bool Tasks []string @@ -90,14 +94,21 @@ func (lr *LogReader) readLiveLogs(pr *v1alpha1.PipelineRun) (<-chan Log, <-chan } wg.Wait() + + if pr.Status.Conditions[0].Status == corev1.ConditionFalse { + errC <- fmt.Errorf(pr.Status.Conditions[0].Message) + } }() return logC, errC, nil } func (lr *LogReader) readAvailableLogs(pr *v1alpha1.PipelineRun) (<-chan Log, <-chan error, error) { - tkn := lr.Clients.Tekton + if err := lr.waitUntilAvailable(10); err != nil { + return nil, nil, err + } + tkn := lr.Clients.Tekton pl, err := tkn.TektonV1alpha1().Pipelines(lr.Ns).Get(pr.Spec.PipelineRef.Name, metav1.GetOptions{}) if err != nil { return nil, nil, fmt.Errorf(err.Error()) @@ -121,11 +132,58 @@ func (lr *LogReader) readAvailableLogs(pr *v1alpha1.PipelineRun) (<-chan Log, <- pipeLogs(logC, errC, tlr) } + if pr.Status.Conditions[0].Status == corev1.ConditionFalse { + errC <- fmt.Errorf(pr.Status.Conditions[0].Message) + } }() return logC, errC, nil } +// reading of logs should wait till the status of run is unknown +// only if run status is unknown, open a watch channel on run +// and keep checking the status until it changes to true|false +// or the reach timeout +func (lr *LogReader) waitUntilAvailable(timeout time.Duration) error { + var first = true + opts := metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", lr.Run).String(), + } + tkn := lr.Clients.Tekton + run, err := tkn.TektonV1alpha1().PipelineRuns(lr.Ns).Get(lr.Run, metav1.GetOptions{}) + if err != nil { + return err + } + if empty(run.Status) { + return nil + } + if run.Status.Conditions[0].Status != corev1.ConditionUnknown { + return nil + } + + watchRun, err := tkn.TektonV1alpha1().PipelineRuns(lr.Ns).Watch(opts) + if err != nil { + return err + } + for { + select { + case event := <-watchRun.ResultChan(): + if event.Object.(*v1alpha1.PipelineRun).IsDone() { + watchRun.Stop() + return nil + } + if first { + first = false + fmt.Fprintln(lr.Stream.Out, "Pipeline still running ...") + } + case <-time.After(timeout * time.Second): + watchRun.Stop() + fmt.Fprintln(lr.Stream.Err, "No logs found") + return nil + } + } +} + func pipeLogs(logC chan<- Log, errC chan<- error, tlr *taskrun.LogReader) { tlogC, terrC, err := tlr.Read() if err != nil { @@ -151,3 +209,7 @@ func pipeLogs(logC chan<- Log, errC chan<- error, tlr *taskrun.LogReader) { } } } + +func empty(status v1alpha1.PipelineRunStatus) bool { + return len(status.Conditions) == 0 +} diff --git a/pkg/cmd/pipelinerun/log_test.go b/pkg/cmd/pipelinerun/log_test.go index 5c0166bc3..2740037d5 100644 --- a/pkg/cmd/pipelinerun/log_test.go +++ b/pkg/cmd/pipelinerun/log_test.go @@ -31,6 +31,8 @@ import ( pipelinetest "github.com/tektoncd/pipeline/test" tb "github.com/tektoncd/pipeline/test/builder" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + k8stest "k8s.io/client-go/testing" "knative.dev/pkg/apis" ) @@ -44,7 +46,7 @@ func TestLog_no_pipelinerun_argument(t *testing.T) { } } -func TestLog_missing_pipelinerun(t *testing.T) { +func TestLog_run_not_found(t *testing.T) { pr := []*v1alpha1.PipelineRun{ tb.PipelineRun("output-pipeline-1", "ns", tb.PipelineRunLabel("tekton.dev/pipeline", "output-pipeline-1"), @@ -469,6 +471,266 @@ func TestPipelinerunLog_follow_mode(t *testing.T) { test.AssertOutput(t, expected, output) } +func TestLogs_error_log(t *testing.T) { + var ( + pipelineName = "errlogs-pipeline" + prName = "errlogs-run" + ns = "namespace" + taskName = "errlogs-task" + errMsg = "Pipeline tektoncd/errlog-pipeline can't be Run; it contains Tasks that don't exist: Couldn't retrieve Task errlog-tasks: task.tekton.dev errlog-tasks not found" + ) + + ts := []*v1alpha1.Task{ + tb.Task(taskName, ns, + tb.TaskSpec()), + } + + prs := []*v1alpha1.PipelineRun{ + tb.PipelineRun(prName, ns, + tb.PipelineRunLabel("tekton.dev/pipeline", prName), + tb.PipelineRunSpec(pipelineName), + tb.PipelineRunStatus( + tb.PipelineRunStatusCondition(apis.Condition{ + Status: corev1.ConditionFalse, + Message: errMsg, + }), + ), + ), + } + + ps := []*v1alpha1.Pipeline{ + tb.Pipeline(pipelineName, ns, + tb.PipelineSpec( + tb.PipelineTask(taskName, taskName), + ), + ), + } + cs, _ := test.SeedTestData(t, pipelinetest.Data{PipelineRuns: prs, Pipelines: ps, Tasks: ts}) + prlo := logOpts(prName, ns, cs, fake.Streamer([]fake.Log{}), false, false) + output, err := fetchLogs(prlo) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + test.AssertOutput(t, errMsg+"\n", output) +} + +func TestLogs_nologs(t *testing.T) { + var ( + pipelineName = "nologs-pipeline" + prName = "nologs-run" + ns = "namespace" + taskName = "nologs-task" + ) + + prs := []*v1alpha1.PipelineRun{ + tb.PipelineRun(prName, ns, + tb.PipelineRunLabel("tekton.dev/pipeline", prName), + tb.PipelineRunSpec(pipelineName), + tb.PipelineRunStatus( + tb.PipelineRunStatusCondition(apis.Condition{ + Status: corev1.ConditionUnknown, + Message: "Running", + }), + ), + ), + } + + ps := []*v1alpha1.Pipeline{ + tb.Pipeline(pipelineName, ns, + tb.PipelineSpec( + tb.PipelineTask(taskName, taskName), + ), + ), + } + cs, _ := test.SeedTestData(t, pipelinetest.Data{PipelineRuns: prs, Pipelines: ps}) + prlo := logOpts(prName, ns, cs, fake.Streamer([]fake.Log{}), false, false) + output, err := fetchLogs(prlo) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + test.AssertOutput(t, "No logs found\n", output) +} + +func TestLog_run_failed_with_and_without_follow(t *testing.T) { + var ( + pipelineName = "fail-pipeline" + prName = "fail-run" + ns = "namespace" + taskName = "fail-task" + failMessage = "Failed because I wanted" + ) + + prs := []*v1alpha1.PipelineRun{ + tb.PipelineRun(prName, ns, + tb.PipelineRunLabel("tekton.dev/pipeline", prName), + tb.PipelineRunSpec(pipelineName), + tb.PipelineRunStatus( + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Message: failMessage, + }), + ), + ), + } + + ps := []*v1alpha1.Pipeline{ + tb.Pipeline(pipelineName, ns, + tb.PipelineSpec( + tb.PipelineTask(taskName, taskName), + ), + ), + } + cs, _ := test.SeedTestData(t, pipelinetest.Data{PipelineRuns: prs, Pipelines: ps}) + + // follow mode disabled + prlo := logOpts(prName, ns, cs, fake.Streamer([]fake.Log{}), false, false) + output, err := fetchLogs(prlo) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + test.AssertOutput(t, failMessage+"\n", output) + + // follow mode enabled + prlo = logOpts(prName, ns, cs, fake.Streamer([]fake.Log{}), false, true) + output, err = fetchLogs(prlo) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + test.AssertOutput(t, failMessage+"\n", output) +} + +func TestLog_pipeline_still_running(t *testing.T) { + var ( + pipelineName = "inprogress-pipeline" + prName = "inprogress-run" + ns = "namespace" + taskName = "inprogress-task" + ) + + initialPRs := []*v1alpha1.PipelineRun{ + tb.PipelineRun(prName, ns, + tb.PipelineRunLabel("tekton.dev/pipeline", prName), + tb.PipelineRunSpec(pipelineName), + tb.PipelineRunStatus( + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Message: "Running", + }), + ), + ), + } + + finalPRs := []*v1alpha1.PipelineRun{ + tb.PipelineRun(prName, ns, + tb.PipelineRunStatus( + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Message: "Running", + }), + ), + ), + + tb.PipelineRun(prName, ns, + tb.PipelineRunStatus( + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + Message: "Running", + }), + ), + ), + } + + ps := []*v1alpha1.Pipeline{ + tb.Pipeline(pipelineName, ns, + tb.PipelineSpec( + tb.PipelineTask(taskName, taskName), + ), + ), + } + cs, _ := test.SeedTestData(t, pipelinetest.Data{PipelineRuns: initialPRs, Pipelines: ps}) + watcher := watch.NewFake() + cs.Pipeline.PrependWatchReactor("pipelineruns", k8stest.DefaultWatchReactor(watcher, nil)) + prlo := logOpts(prName, ns, cs, fake.Streamer([]fake.Log{}), false, false) + + updatePR(finalPRs, watcher) + + output, err := fetchLogs(prlo) + + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + test.AssertOutput(t, "Pipeline still running ..."+"\n", output) +} + +func TestLog_pipeline_status_done(t *testing.T) { + var ( + pipelineName = "done-pipeline" + prName = "done-run" + ns = "namespace" + taskName = "done-task" + ) + + prs := []*v1alpha1.PipelineRun{ + tb.PipelineRun(prName, ns, + tb.PipelineRunLabel("tekton.dev/pipeline", prName), + tb.PipelineRunSpec(pipelineName), + tb.PipelineRunStatus( + tb.PipelineRunStatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown, + Message: "Running", + }), + ), + ), + } + + ps := []*v1alpha1.Pipeline{ + tb.Pipeline(pipelineName, ns, + tb.PipelineSpec( + tb.PipelineTask(taskName, taskName), + ), + ), + } + cs, _ := test.SeedTestData(t, pipelinetest.Data{PipelineRuns: prs, Pipelines: ps}) + watcher := watch.NewFake() + cs.Pipeline.PrependWatchReactor("pipelineruns", k8stest.DefaultWatchReactor(watcher, nil)) + prlo := logOpts(prName, ns, cs, fake.Streamer([]fake.Log{}), false, false) + + go func() { + time.Sleep(time.Second * 1) + for _, pr := range prs { + pr.Status.Conditions[0].Status = corev1.ConditionTrue + pr.Status.Conditions[0].Message = "completed" + watcher.Modify(pr) + } + }() + + start := time.Now() + output, err := fetchLogs(prlo) + elapsed := time.Since(start).Seconds() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if elapsed > 10 { + t.Errorf("Timed out") + } + test.AssertOutput(t, "", output) +} + +func updatePR(finalRuns []*v1alpha1.PipelineRun, watcher *watch.FakeWatcher) { + go func() { + for _, pr := range finalRuns { + time.Sleep(time.Second * 1) + watcher.Modify(pr) + } + }() +} + func logOpts(name string, ns string, cs pipelinetest.Clients, streamer stream.NewStreamerFunc, allSteps bool, follow bool, onlyTasks ...string) *LogOptions { p := test.Params{ Kube: cs.Kube, diff --git a/pkg/cmd/pipelinerun/logs.go b/pkg/cmd/pipelinerun/logs.go index bf35bab97..c56c7ff97 100644 --- a/pkg/cmd/pipelinerun/logs.go +++ b/pkg/cmd/pipelinerun/logs.go @@ -95,6 +95,7 @@ func (lo *LogOptions) Run() error { Ns: lo.Params.Namespace(), Clients: cs, Streamer: streamer, + Stream: lo.Stream, Follow: lo.Follow, AllSteps: lo.AllSteps, Tasks: lo.Tasks, diff --git a/vendor/modules.txt b/vendor/modules.txt index e4a5d1c64..2f796fb75 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -426,6 +426,7 @@ k8s.io/api/authorization/v1beta1 k8s.io/apimachinery/pkg/apis/meta/v1 k8s.io/apimachinery/pkg/runtime/schema k8s.io/apimachinery/pkg/fields +k8s.io/apimachinery/pkg/watch k8s.io/apimachinery/pkg/runtime k8s.io/apimachinery/pkg/util/net k8s.io/apimachinery/pkg/util/yaml @@ -433,7 +434,6 @@ k8s.io/apimachinery/pkg/api/errors k8s.io/apimachinery/pkg/runtime/serializer/streaming k8s.io/apimachinery/pkg/types k8s.io/apimachinery/pkg/util/sets -k8s.io/apimachinery/pkg/watch k8s.io/apimachinery/pkg/util/errors k8s.io/apimachinery/pkg/util/validation k8s.io/apimachinery/pkg/api/equality