Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring log writer. #709

Merged
merged 1 commit into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions pkg/cmd/pipelinerun/log_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/tektoncd/cli/pkg/cli"
"github.com/tektoncd/cli/pkg/cmd/taskrun"
"github.com/tektoncd/cli/pkg/helper/log"
"github.com/tektoncd/cli/pkg/helper/pipelinerun"
"github.com/tektoncd/cli/pkg/helper/pods/stream"
trh "github.com/tektoncd/cli/pkg/helper/taskrun"
Expand All @@ -42,15 +43,7 @@ type LogReader struct {
Tasks []string
}

// Log is the data gets written to the log channel
type Log struct {
Pipeline string
Task string
Step string
Log string
}

func (lr *LogReader) Read() (<-chan Log, <-chan error, error) {
func (lr *LogReader) Read() (<-chan log.Log, <-chan error, error) {
tkn := lr.Clients.Tekton
pr, err := tkn.TektonV1alpha1().PipelineRuns(lr.Ns).Get(lr.Run, metav1.GetOptions{})
if err != nil {
Expand All @@ -64,8 +57,8 @@ func (lr *LogReader) Read() (<-chan Log, <-chan error, error) {

}

func (lr *LogReader) readLiveLogs(pr *v1alpha1.PipelineRun) (<-chan Log, <-chan error, error) {
logC := make(chan Log)
func (lr *LogReader) readLiveLogs(pr *v1alpha1.PipelineRun) (<-chan log.Log, <-chan error, error) {
logC := make(chan log.Log)
errC := make(chan error)

go func() {
Expand Down Expand Up @@ -103,7 +96,7 @@ func (lr *LogReader) readLiveLogs(pr *v1alpha1.PipelineRun) (<-chan Log, <-chan
return logC, errC, nil
}

func (lr *LogReader) readAvailableLogs(pr *v1alpha1.PipelineRun) (<-chan Log, <-chan error, error) {
func (lr *LogReader) readAvailableLogs(pr *v1alpha1.PipelineRun) (<-chan log.Log, <-chan error, error) {
if err := lr.waitUntilAvailable(10); err != nil {
return nil, nil, err
}
Expand All @@ -118,7 +111,7 @@ func (lr *LogReader) readAvailableLogs(pr *v1alpha1.PipelineRun) (<-chan Log, <-
ordered := trh.SortTasksBySpecOrder(pl.Spec.Tasks, pr.Status.TaskRuns)
taskRuns := trh.Filter(ordered, lr.Tasks)

logC := make(chan Log)
logC := make(chan log.Log)
errC := make(chan error)

go func() {
Expand Down Expand Up @@ -185,7 +178,7 @@ func (lr *LogReader) waitUntilAvailable(timeout time.Duration) error {
}
}

func pipeLogs(logC chan<- Log, errC chan<- error, tlr *taskrun.LogReader) {
func pipeLogs(logC chan<- log.Log, errC chan<- error, tlr *taskrun.LogReader) {
tlogC, terrC, err := tlr.Read()
if err != nil {
errC <- err
Expand All @@ -199,7 +192,7 @@ func pipeLogs(logC chan<- Log, errC chan<- error, tlr *taskrun.LogReader) {
tlogC = nil
continue
}
logC <- Log{Task: l.Task, Step: l.Step, Log: l.Log}
logC <- log.Log{Task: l.Task, Step: l.Step, Log: l.Log}

case e, ok := <-terrC:
if !ok {
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/pipelinerun/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/spf13/cobra"
"github.com/tektoncd/cli/pkg/cli"
"github.com/tektoncd/cli/pkg/helper/log"
"github.com/tektoncd/cli/pkg/helper/options"
prhelper "github.com/tektoncd/cli/pkg/helper/pipelinerun"
"github.com/tektoncd/cli/pkg/helper/pods"
Expand Down Expand Up @@ -108,7 +109,7 @@ func Run(opts *options.LogOptions) error {
return err
}

NewLogWriter().Write(opts.Stream, logC, errC)
log.NewLogWriter(log.LogTypePipeline).Write(opts.Stream, logC, errC)

return nil
}
Expand Down
24 changes: 9 additions & 15 deletions pkg/cmd/taskrun/log_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pkg/errors"
"github.com/tektoncd/cli/pkg/cli"
"github.com/tektoncd/cli/pkg/helper/log"
"github.com/tektoncd/cli/pkg/helper/pods"
"github.com/tektoncd/cli/pkg/helper/pods/stream"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
Expand All @@ -40,13 +41,6 @@ func (s *step) hasStarted() bool {
return s.state.Waiting == nil
}

//Log data to write on log channel
type Log struct {
Task string
Step string
Log string
}

type LogReader struct {
Task string
Run string
Expand All @@ -60,7 +54,7 @@ type LogReader struct {
Steps []string
}

func (lr *LogReader) Read() (<-chan Log, <-chan error, error) {
func (lr *LogReader) Read() (<-chan log.Log, <-chan error, error) {
tkn := lr.Clients.Tekton
tr, err := tkn.TektonV1alpha1().TaskRuns(lr.Ns).Get(lr.Run, metav1.GetOptions{})
if err != nil {
Expand All @@ -72,7 +66,7 @@ func (lr *LogReader) Read() (<-chan Log, <-chan error, error) {
return lr.readLogs(tr)
}

func (lr *LogReader) readLogs(tr *v1alpha1.TaskRun) (<-chan Log, <-chan error, error) {
func (lr *LogReader) readLogs(tr *v1alpha1.TaskRun) (<-chan log.Log, <-chan error, error) {
if lr.Follow {
return lr.readLiveLogs()
}
Expand All @@ -97,7 +91,7 @@ func (lr *LogReader) formTaskName(tr *v1alpha1.TaskRun) {
lr.Task = fmt.Sprintf("Task %d", lr.Number)
}

func (lr *LogReader) readLiveLogs() (<-chan Log, <-chan error, error) {
func (lr *LogReader) readLiveLogs() (<-chan log.Log, <-chan error, error) {
tr, err := lr.waitUntilPodNameAvailable(10)
if err != nil {
return nil, nil, err
Expand All @@ -119,7 +113,7 @@ func (lr *LogReader) readLiveLogs() (<-chan Log, <-chan error, error) {
return logC, errC, err
}

func (lr *LogReader) readAvailableLogs(tr *v1alpha1.TaskRun) (<-chan Log, <-chan error, error) {
func (lr *LogReader) readAvailableLogs(tr *v1alpha1.TaskRun) (<-chan log.Log, <-chan error, error) {
if !tr.HasStarted() {
return nil, nil, fmt.Errorf("task %s has not started yet", lr.Task)
}
Expand Down Expand Up @@ -149,8 +143,8 @@ func (lr *LogReader) readAvailableLogs(tr *v1alpha1.TaskRun) (<-chan Log, <-chan
return logC, errC, nil
}

func (lr *LogReader) readStepsLogs(steps []*step, pod *pods.Pod, follow bool) (<-chan Log, <-chan error) {
logC := make(chan Log)
func (lr *LogReader) readStepsLogs(steps []*step, pod *pods.Pod, follow bool) (<-chan log.Log, <-chan error) {
logC := make(chan log.Log)
errC := make(chan error)

go func() {
Expand All @@ -174,10 +168,10 @@ func (lr *LogReader) readStepsLogs(steps []*step, pod *pods.Pod, follow bool) (<
case l, ok := <-podC:
if !ok {
podC = nil
logC <- Log{Task: lr.Task, Step: step.name, Log: "EOFLOG"}
logC <- log.Log{Task: lr.Task, Step: step.name, Log: "EOFLOG"}
continue
}
logC <- Log{Task: lr.Task, Step: step.name, Log: l.Log}
logC <- log.Log{Task: lr.Task, Step: step.name, Log: l.Log}

case e, ok := <-perrC:
if !ok {
Expand Down
59 changes: 0 additions & 59 deletions pkg/cmd/taskrun/log_writer.go

This file was deleted.

3 changes: 2 additions & 1 deletion pkg/cmd/taskrun/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/spf13/cobra"
"github.com/tektoncd/cli/pkg/cli"
"github.com/tektoncd/cli/pkg/helper/log"
"github.com/tektoncd/cli/pkg/helper/options"
"github.com/tektoncd/cli/pkg/helper/pods"
trlist "github.com/tektoncd/cli/pkg/helper/taskrun/list"
Expand Down Expand Up @@ -117,7 +118,7 @@ func Run(opts *options.LogOptions) error {
return err
}

NewLogWriter().Write(opts.Stream, logC, errC)
log.NewLogWriter(log.LogTypeTask).Write(opts.Stream, logC, errC)
return nil
}

Expand Down
28 changes: 28 additions & 0 deletions pkg/helper/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright © 2019 The Tekton Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package log

const (
LogTypePipeline = "pipeline"
LogTypeTask = "task"
)

// Log represents data to write on log channel
type Log struct {
Pipeline string
Task string
Step string
Log string
}
22 changes: 16 additions & 6 deletions pkg/cmd/pipelinerun/log_writer.go → pkg/helper/log/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pipelinerun
package log

import (
"fmt"
Expand All @@ -21,17 +21,21 @@ import (
"github.com/tektoncd/cli/pkg/formatted"
)

// LogWriter helps logging pod"s log
type LogWriter struct {
fmt *formatted.Color
fmt *formatted.Color
logType string
}

//NewLogWriter returns the new instance of LogWriter
func NewLogWriter() *LogWriter {
// NewLogWriter returns the new instance of LogWriter
func NewLogWriter(logType string) *LogWriter {
return &LogWriter{
fmt: formatted.NewColor(),
fmt: formatted.NewColor(),
logType: logType,
}
}

// Write formatted pod's logs
func (lw *LogWriter) Write(s *cli.Stream, logC <-chan Log, errC <-chan error) {
for logC != nil || errC != nil {
select {
Expand All @@ -46,7 +50,13 @@ func (lw *LogWriter) Write(s *cli.Stream, logC <-chan Log, errC <-chan error) {
continue
}

lw.fmt.Rainbow.Fprintf(l.Step, s.Out, "[%s : %s] ", l.Task, l.Step)
switch lw.logType {
case LogTypePipeline:
lw.fmt.Rainbow.Fprintf(l.Step, s.Out, "[%s : %s] ", l.Task, l.Step)
case LogTypeTask:
lw.fmt.Rainbow.Fprintf(l.Step, s.Out, "[%s] ", l.Step)
}

fmt.Fprintf(s.Out, "%s\n", l.Log)
case e, ok := <-errC:
if !ok {
Expand Down