From ceefc0f928559d818d3ae708d60a713bf994c090 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Wed, 11 May 2022 18:31:39 -0500 Subject: [PATCH 1/2] enhance(local executor): print to stdout via client field By printing to a stdout field on the local executor, we can allow the (planned) tests to capture stdout and make sure the executor is behaving correctly. --- cmd/vela-worker/exec.go | 1 + cmd/vela-worker/worker.go | 1 + executor/local/api.go | 3 +-- executor/local/build.go | 43 +++++++++++++++++++-------------------- executor/local/local.go | 19 +++++++++++++++++ executor/local/opts.go | 23 +++++++++++++++++++++ executor/local/service.go | 3 +-- executor/local/stage.go | 7 +++---- executor/local/step.go | 3 +-- executor/setup.go | 4 ++++ 10 files changed, 75 insertions(+), 32 deletions(-) diff --git a/cmd/vela-worker/exec.go b/cmd/vela-worker/exec.go index 62aa1198..c5093f3f 100644 --- a/cmd/vela-worker/exec.go +++ b/cmd/vela-worker/exec.go @@ -69,6 +69,7 @@ func (w *Worker) exec(index int) error { // https://godoc.org/github.com/go-vela/worker/executor#New _executor, err := executor.New(&executor.Setup{ Logger: logger, + Mock: w.Config.Mock, Driver: w.Config.Executor.Driver, LogMethod: w.Config.Executor.LogMethod, MaxLogSize: w.Config.Executor.MaxLogSize, diff --git a/cmd/vela-worker/worker.go b/cmd/vela-worker/worker.go index 02eb4ff1..32c1ce14 100644 --- a/cmd/vela-worker/worker.go +++ b/cmd/vela-worker/worker.go @@ -46,6 +46,7 @@ type ( // Config represents the worker configuration. Config struct { + Mock bool // Mock should only be true for tests API *API Build *Build CheckIn time.Duration diff --git a/executor/local/api.go b/executor/local/api.go index 2be695a5..3f67a5de 100644 --- a/executor/local/api.go +++ b/executor/local/api.go @@ -7,7 +7,6 @@ package local import ( "context" "fmt" - "os" "time" "github.com/go-vela/types/constants" @@ -193,7 +192,7 @@ func (c *client) CancelBuild() (*library.Build, error) { err = c.DestroyBuild(context.Background()) if err != nil { - fmt.Fprintln(os.Stdout, "unable to destroy build:", err) + fmt.Fprintln(c.stdout, "unable to destroy build:", err) } return b, nil diff --git a/executor/local/build.go b/executor/local/build.go index 233c1c49..8c1f0168 100644 --- a/executor/local/build.go +++ b/executor/local/build.go @@ -7,7 +7,6 @@ package local import ( "context" "fmt" - "os" "sync" "time" @@ -97,7 +96,7 @@ func (c *client) PlanBuild(ctx context.Context) error { } // output init progress to stdout - fmt.Fprintln(os.Stdout, _pattern, "> Inspecting runtime network...") + fmt.Fprintln(c.stdout, _pattern, "> Inspecting runtime network...") // inspect the runtime network for the pipeline network, err := c.Runtime.InspectNetwork(ctx, c.pipeline) @@ -107,7 +106,7 @@ func (c *client) PlanBuild(ctx context.Context) error { } // output the network information to stdout - fmt.Fprintln(os.Stdout, _pattern, string(network)) + fmt.Fprintln(c.stdout, _pattern, string(network)) // create the runtime volume for the pipeline err = c.Runtime.CreateVolume(ctx, c.pipeline) @@ -117,7 +116,7 @@ func (c *client) PlanBuild(ctx context.Context) error { } // output init progress to stdout - fmt.Fprintln(os.Stdout, _pattern, "> Inspecting runtime volume...") + fmt.Fprintln(c.stdout, _pattern, "> Inspecting runtime volume...") // inspect the runtime volume for the pipeline volume, err := c.Runtime.InspectVolume(ctx, c.pipeline) @@ -127,7 +126,7 @@ func (c *client) PlanBuild(ctx context.Context) error { } // output the volume information to stdout - fmt.Fprintln(os.Stdout, _pattern, string(volume)) + fmt.Fprintln(c.stdout, _pattern, string(volume)) return c.err } @@ -162,7 +161,7 @@ func (c *client) AssembleBuild(ctx context.Context) error { } // output init progress to stdout - fmt.Fprintln(os.Stdout, _pattern, "> Preparing service images...") + fmt.Fprintln(c.stdout, _pattern, "> Preparing service images...") // create the services for the pipeline for _, _service := range c.pipeline.Services { @@ -183,11 +182,11 @@ func (c *client) AssembleBuild(ctx context.Context) error { } // output the image information to stdout - fmt.Fprintln(os.Stdout, _pattern, string(image)) + fmt.Fprintln(c.stdout, _pattern, string(image)) } // output init progress to stdout - fmt.Fprintln(os.Stdout, _pattern, "> Preparing stage images...") + fmt.Fprintln(c.stdout, _pattern, "> Preparing stage images...") // create the stages for the pipeline for _, _stage := range c.pipeline.Stages { @@ -206,7 +205,7 @@ func (c *client) AssembleBuild(ctx context.Context) error { } // output init progress to stdout - fmt.Fprintln(os.Stdout, _pattern, "> Preparing step images...") + fmt.Fprintln(c.stdout, _pattern, "> Preparing step images...") // create the steps for the pipeline for _, _step := range c.pipeline.Steps { @@ -229,11 +228,11 @@ func (c *client) AssembleBuild(ctx context.Context) error { } // output the image information to stdout - fmt.Fprintln(os.Stdout, _pattern, string(image)) + fmt.Fprintln(c.stdout, _pattern, string(image)) } // output a new line for readability to stdout - fmt.Fprintln(os.Stdout, "") + fmt.Fprintln(c.stdout, "") // assemble runtime build just before any containers execute c.err = c.Runtime.AssembleBuild(ctx, c.pipeline) @@ -353,14 +352,14 @@ func (c *client) StreamBuild(ctx context.Context) error { streams, streamCtx := errgroup.WithContext(ctx) defer func() { - fmt.Fprintln(os.Stdout, "waiting for stream functions to return") + fmt.Fprintln(c.stdout, "waiting for stream functions to return") err := streams.Wait() if err != nil { - fmt.Fprintln(os.Stdout, "error in a stream request:", err) + fmt.Fprintln(c.stdout, "error in a stream request:", err) } - fmt.Fprintln(os.Stdout, "all stream functions have returned") + fmt.Fprintln(c.stdout, "all stream functions have returned") }() // allow the runtime to do log/event streaming setup at build-level @@ -374,11 +373,11 @@ func (c *client) StreamBuild(ctx context.Context) error { select { case req := <-c.streamRequests: streams.Go(func() error { - fmt.Fprintf(os.Stdout, "[%s: %s] > Streaming container '%s'...\n", req.Key, req.Container.Name, req.Container.ID) + fmt.Fprintf(c.stdout, "[%s: %s] > Streaming container '%s'...\n", req.Key, req.Container.Name, req.Container.ID) err := req.Stream(streamCtx, req.Container) if err != nil { - fmt.Fprintln(os.Stdout, "error streaming:", err) + fmt.Fprintln(c.stdout, "error streaming:", err) } return nil @@ -399,7 +398,7 @@ func (c *client) DestroyBuild(ctx context.Context) error { err = c.Runtime.RemoveBuild(ctx, c.pipeline) if err != nil { // output the error information to stdout - fmt.Fprintln(os.Stdout, "unable to destroy runtime build:", err) + fmt.Fprintln(c.stdout, "unable to destroy runtime build:", err) } }() @@ -414,7 +413,7 @@ func (c *client) DestroyBuild(ctx context.Context) error { err = c.DestroyStep(ctx, _step) if err != nil { // output the error information to stdout - fmt.Fprintln(os.Stdout, "unable to destroy step:", err) + fmt.Fprintln(c.stdout, "unable to destroy step:", err) } } @@ -429,7 +428,7 @@ func (c *client) DestroyBuild(ctx context.Context) error { err = c.DestroyStage(ctx, _stage) if err != nil { // output the error information to stdout - fmt.Fprintln(os.Stdout, "unable to destroy stage:", err) + fmt.Fprintln(c.stdout, "unable to destroy stage:", err) } } @@ -439,7 +438,7 @@ func (c *client) DestroyBuild(ctx context.Context) error { err = c.DestroyService(ctx, _service) if err != nil { // output the error information to stdout - fmt.Fprintln(os.Stdout, "unable to destroy service:", err) + fmt.Fprintln(c.stdout, "unable to destroy service:", err) } } @@ -447,14 +446,14 @@ func (c *client) DestroyBuild(ctx context.Context) error { err = c.Runtime.RemoveVolume(ctx, c.pipeline) if err != nil { // output the error information to stdout - fmt.Fprintln(os.Stdout, "unable to destroy runtime volume:", err) + fmt.Fprintln(c.stdout, "unable to destroy runtime volume:", err) } // remove the runtime network for the pipeline err = c.Runtime.RemoveNetwork(ctx, c.pipeline) if err != nil { // output the error information to stdout - fmt.Fprintln(os.Stdout, "unable to destroy runtime network:", err) + fmt.Fprintln(c.stdout, "unable to destroy runtime network:", err) } return err diff --git a/executor/local/local.go b/executor/local/local.go index 5fb45c3e..c974d9e7 100644 --- a/executor/local/local.go +++ b/executor/local/local.go @@ -5,6 +5,7 @@ package local import ( + "os" "reflect" "sync" @@ -33,9 +34,24 @@ type ( user *library.User err error streamRequests chan message.StreamRequest + + // internal field partially exported for tests + stdout *os.File + mockStdoutReader *os.File + } + + // MockedClient is for internal use to facilitate testing the local executor. + MockedClient interface { + MockStdout() *os.File } ) +// MockStdout is for internal use to facilitate testing the local executor. +// MockStdout returns a reader over a mocked Stdout. +func (c *client) MockStdout() *os.File { + return c.mockStdoutReader +} + // equal returns true if the other client is the equivalent. func Equal(a, b *client) bool { // handle any nil comparisons @@ -64,6 +80,9 @@ func New(opts ...Opt) (*client, error) { // create new local client c := new(client) + // Add stdout by default + c.stdout = os.Stdout + // instantiate streamRequests channel (which may be overridden using withStreamRequests()). c.streamRequests = make(chan message.StreamRequest) diff --git a/executor/local/opts.go b/executor/local/opts.go index d4b0ba55..78ff1513 100644 --- a/executor/local/opts.go +++ b/executor/local/opts.go @@ -6,6 +6,7 @@ package local import ( "fmt" + "os" "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime" @@ -121,6 +122,28 @@ func WithVersion(version string) Opt { } } +// WithMockStdout adds a mock stdout writer to the client if mock is true. +// If mock is true, then you must use a goroutine to read from +// MockStdout as quickly as possible, or writing to stdout will hang. +func WithMockStdout(mock bool) Opt { + return func(c *client) error { + if !mock { + return nil + } + + // New() sets c.stdout = os.stdout, replace it if a mock is required. + reader, writer, err := os.Pipe() + if err != nil { + return err + } + + c.mockStdoutReader = reader + c.stdout = writer + + return nil + } +} + // withStreamRequests sets the streamRequests channel in the executor client for Linux // (primarily used for tests). func withStreamRequests(s chan message.StreamRequest) Opt { diff --git a/executor/local/service.go b/executor/local/service.go index eba3ee96..cf465039 100644 --- a/executor/local/service.go +++ b/executor/local/service.go @@ -8,7 +8,6 @@ import ( "bufio" "context" "fmt" - "os" "time" "github.com/go-vela/worker/internal/message" @@ -125,7 +124,7 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err // scan entire container output for scanner.Scan() { // ensure we output to stdout - fmt.Fprintln(os.Stdout, _pattern, scanner.Text()) + fmt.Fprintln(c.stdout, _pattern, scanner.Text()) } return scanner.Err() diff --git a/executor/local/stage.go b/executor/local/stage.go index 668580d1..7aabae3d 100644 --- a/executor/local/stage.go +++ b/executor/local/stage.go @@ -7,7 +7,6 @@ package local import ( "context" "fmt" - "os" "sync" "github.com/go-vela/types/pipeline" @@ -23,7 +22,7 @@ func (c *client) CreateStage(ctx context.Context, s *pipeline.Stage) error { _pattern := fmt.Sprintf(stagePattern, c.init.Name, c.init.Name) // output init progress to stdout - fmt.Fprintln(os.Stdout, _pattern, "> Preparing step images for stage", s.Name, "...") + fmt.Fprintln(c.stdout, _pattern, "> Preparing step images for stage", s.Name, "...") // create the steps for the stage for _, _step := range s.Steps { @@ -43,7 +42,7 @@ func (c *client) CreateStage(ctx context.Context, s *pipeline.Stage) error { } // output the image information to stdout - fmt.Fprintln(os.Stdout, _pattern, string(image)) + fmt.Fprintln(c.stdout, _pattern, string(image)) } return nil @@ -121,7 +120,7 @@ func (c *client) DestroyStage(ctx context.Context, s *pipeline.Stage) error { // destroy the step err = c.DestroyStep(ctx, _step) if err != nil { - fmt.Fprintln(os.Stdout, "unable to destroy step: ", err) + fmt.Fprintln(c.stdout, "unable to destroy step: ", err) } } diff --git a/executor/local/step.go b/executor/local/step.go index 27c6bcac..6a195f4b 100644 --- a/executor/local/step.go +++ b/executor/local/step.go @@ -8,7 +8,6 @@ import ( "bufio" "context" "fmt" - "os" "time" "github.com/go-vela/types/constants" @@ -164,7 +163,7 @@ func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error // scan entire container output for scanner.Scan() { // ensure we output to stdout - fmt.Fprintln(os.Stdout, _pattern, scanner.Text()) + fmt.Fprintln(c.stdout, _pattern, scanner.Text()) } return scanner.Err() diff --git a/executor/setup.go b/executor/setup.go index ba1dc615..3044aa37 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -31,6 +31,9 @@ type Setup struct { // Executor Configuration + // Mock should only be true for tests. + Mock bool + // specifies the executor driver to use Driver string // specifies the executor method used to publish logs @@ -106,6 +109,7 @@ func (s *Setup) Local() (Engine, error) { local.WithUser(s.User), local.WithVelaClient(s.Client), local.WithVersion(s.Version), + local.WithMockStdout(s.Mock), ) } From 0a0f0bd5e11ce5ebf787beccb5dfcfa3dc4bb911 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Thu, 12 May 2022 16:04:41 -0500 Subject: [PATCH 2/2] tests: add test for WithMockStdout --- executor/local/opts_test.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/executor/local/opts_test.go b/executor/local/opts_test.go index f8fd1dc6..ad98be49 100644 --- a/executor/local/opts_test.go +++ b/executor/local/opts_test.go @@ -331,3 +331,39 @@ func TestLocal_Opt_WithVersion(t *testing.T) { }) } } + +func TestLocal_Opt_WithMockStdout(t *testing.T) { + // setup tests + tests := []struct { + name string + mock bool + wantNil bool + }{ + { + name: "standard", + mock: false, + wantNil: true, + }, + { + name: "mocked", + mock: true, + wantNil: false, + }, + } + + // run tests + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _engine, err := New( + WithMockStdout(test.mock), + ) + if err != nil { + t.Errorf("unable to create local engine: %v", err) + } + + if !reflect.DeepEqual(_engine.MockStdout() == nil, test.wantNil) { + t.Errorf("WithMockStdout is %v, wantNil = %v", _engine.MockStdout() == nil, test.wantNil) + } + }) + } +}