Skip to content

Commit

Permalink
enhance(local executor): print to stdout via client field
Browse files Browse the repository at this point in the history
By printing to a stdout field on the local executor,
we can allow the (planned) tests to capture stdout
and make sure the executor is behaving correctly.
  • Loading branch information
cognifloyd committed May 12, 2022
1 parent 69101f7 commit ceefc0f
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 32 deletions.
1 change: 1 addition & 0 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (w *Worker) exec(index int) error {
// https://godoc.org/github.com/go-vela/worker/executor#New
_executor, err := executor.New(&executor.Setup{
Logger: logger,
Mock: w.Config.Mock,
Driver: w.Config.Executor.Driver,
LogMethod: w.Config.Executor.LogMethod,
MaxLogSize: w.Config.Executor.MaxLogSize,
Expand Down
1 change: 1 addition & 0 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type (

// Config represents the worker configuration.
Config struct {
Mock bool // Mock should only be true for tests
API *API
Build *Build
CheckIn time.Duration
Expand Down
3 changes: 1 addition & 2 deletions executor/local/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package local
import (
"context"
"fmt"
"os"
"time"

"github.com/go-vela/types/constants"
Expand Down Expand Up @@ -193,7 +192,7 @@ func (c *client) CancelBuild() (*library.Build, error) {

err = c.DestroyBuild(context.Background())
if err != nil {
fmt.Fprintln(os.Stdout, "unable to destroy build:", err)
fmt.Fprintln(c.stdout, "unable to destroy build:", err)
}

return b, nil
Expand Down
43 changes: 21 additions & 22 deletions executor/local/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package local
import (
"context"
"fmt"
"os"
"sync"
"time"

Expand Down Expand Up @@ -97,7 +96,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
}

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Inspecting runtime network...")
fmt.Fprintln(c.stdout, _pattern, "> Inspecting runtime network...")

// inspect the runtime network for the pipeline
network, err := c.Runtime.InspectNetwork(ctx, c.pipeline)
Expand All @@ -107,7 +106,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
}

// output the network information to stdout
fmt.Fprintln(os.Stdout, _pattern, string(network))
fmt.Fprintln(c.stdout, _pattern, string(network))

// create the runtime volume for the pipeline
err = c.Runtime.CreateVolume(ctx, c.pipeline)
Expand All @@ -117,7 +116,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
}

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Inspecting runtime volume...")
fmt.Fprintln(c.stdout, _pattern, "> Inspecting runtime volume...")

// inspect the runtime volume for the pipeline
volume, err := c.Runtime.InspectVolume(ctx, c.pipeline)
Expand All @@ -127,7 +126,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
}

// output the volume information to stdout
fmt.Fprintln(os.Stdout, _pattern, string(volume))
fmt.Fprintln(c.stdout, _pattern, string(volume))

return c.err
}
Expand Down Expand Up @@ -162,7 +161,7 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Preparing service images...")
fmt.Fprintln(c.stdout, _pattern, "> Preparing service images...")

// create the services for the pipeline
for _, _service := range c.pipeline.Services {
Expand All @@ -183,11 +182,11 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// output the image information to stdout
fmt.Fprintln(os.Stdout, _pattern, string(image))
fmt.Fprintln(c.stdout, _pattern, string(image))
}

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Preparing stage images...")
fmt.Fprintln(c.stdout, _pattern, "> Preparing stage images...")

// create the stages for the pipeline
for _, _stage := range c.pipeline.Stages {
Expand All @@ -206,7 +205,7 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Preparing step images...")
fmt.Fprintln(c.stdout, _pattern, "> Preparing step images...")

// create the steps for the pipeline
for _, _step := range c.pipeline.Steps {
Expand All @@ -229,11 +228,11 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// output the image information to stdout
fmt.Fprintln(os.Stdout, _pattern, string(image))
fmt.Fprintln(c.stdout, _pattern, string(image))
}

// output a new line for readability to stdout
fmt.Fprintln(os.Stdout, "")
fmt.Fprintln(c.stdout, "")

// assemble runtime build just before any containers execute
c.err = c.Runtime.AssembleBuild(ctx, c.pipeline)
Expand Down Expand Up @@ -353,14 +352,14 @@ func (c *client) StreamBuild(ctx context.Context) error {
streams, streamCtx := errgroup.WithContext(ctx)

defer func() {
fmt.Fprintln(os.Stdout, "waiting for stream functions to return")
fmt.Fprintln(c.stdout, "waiting for stream functions to return")

err := streams.Wait()
if err != nil {
fmt.Fprintln(os.Stdout, "error in a stream request:", err)
fmt.Fprintln(c.stdout, "error in a stream request:", err)
}

fmt.Fprintln(os.Stdout, "all stream functions have returned")
fmt.Fprintln(c.stdout, "all stream functions have returned")
}()

// allow the runtime to do log/event streaming setup at build-level
Expand All @@ -374,11 +373,11 @@ func (c *client) StreamBuild(ctx context.Context) error {
select {
case req := <-c.streamRequests:
streams.Go(func() error {
fmt.Fprintf(os.Stdout, "[%s: %s] > Streaming container '%s'...\n", req.Key, req.Container.Name, req.Container.ID)
fmt.Fprintf(c.stdout, "[%s: %s] > Streaming container '%s'...\n", req.Key, req.Container.Name, req.Container.ID)

err := req.Stream(streamCtx, req.Container)
if err != nil {
fmt.Fprintln(os.Stdout, "error streaming:", err)
fmt.Fprintln(c.stdout, "error streaming:", err)
}

return nil
Expand All @@ -399,7 +398,7 @@ func (c *client) DestroyBuild(ctx context.Context) error {
err = c.Runtime.RemoveBuild(ctx, c.pipeline)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy runtime build:", err)
fmt.Fprintln(c.stdout, "unable to destroy runtime build:", err)
}
}()

Expand All @@ -414,7 +413,7 @@ func (c *client) DestroyBuild(ctx context.Context) error {
err = c.DestroyStep(ctx, _step)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy step:", err)
fmt.Fprintln(c.stdout, "unable to destroy step:", err)
}
}

Expand All @@ -429,7 +428,7 @@ func (c *client) DestroyBuild(ctx context.Context) error {
err = c.DestroyStage(ctx, _stage)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy stage:", err)
fmt.Fprintln(c.stdout, "unable to destroy stage:", err)
}
}

Expand All @@ -439,22 +438,22 @@ func (c *client) DestroyBuild(ctx context.Context) error {
err = c.DestroyService(ctx, _service)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy service:", err)
fmt.Fprintln(c.stdout, "unable to destroy service:", err)
}
}

// remove the runtime volume for the pipeline
err = c.Runtime.RemoveVolume(ctx, c.pipeline)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy runtime volume:", err)
fmt.Fprintln(c.stdout, "unable to destroy runtime volume:", err)
}

// remove the runtime network for the pipeline
err = c.Runtime.RemoveNetwork(ctx, c.pipeline)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy runtime network:", err)
fmt.Fprintln(c.stdout, "unable to destroy runtime network:", err)
}

return err
Expand Down
19 changes: 19 additions & 0 deletions executor/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package local

import (
"os"
"reflect"
"sync"

Expand Down Expand Up @@ -33,9 +34,24 @@ type (
user *library.User
err error
streamRequests chan message.StreamRequest

// internal field partially exported for tests
stdout *os.File
mockStdoutReader *os.File
}

// MockedClient is for internal use to facilitate testing the local executor.
MockedClient interface {
MockStdout() *os.File
}
)

// MockStdout is for internal use to facilitate testing the local executor.
// MockStdout returns a reader over a mocked Stdout.
func (c *client) MockStdout() *os.File {
return c.mockStdoutReader
}

// equal returns true if the other client is the equivalent.
func Equal(a, b *client) bool {
// handle any nil comparisons
Expand Down Expand Up @@ -64,6 +80,9 @@ func New(opts ...Opt) (*client, error) {
// create new local client
c := new(client)

// Add stdout by default
c.stdout = os.Stdout

// instantiate streamRequests channel (which may be overridden using withStreamRequests()).
c.streamRequests = make(chan message.StreamRequest)

Expand Down
23 changes: 23 additions & 0 deletions executor/local/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package local

import (
"fmt"
"os"

"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/runtime"
Expand Down Expand Up @@ -121,6 +122,28 @@ func WithVersion(version string) Opt {
}
}

// WithMockStdout adds a mock stdout writer to the client if mock is true.
// If mock is true, then you must use a goroutine to read from
// MockStdout as quickly as possible, or writing to stdout will hang.
func WithMockStdout(mock bool) Opt {
return func(c *client) error {
if !mock {
return nil
}

// New() sets c.stdout = os.stdout, replace it if a mock is required.
reader, writer, err := os.Pipe()
if err != nil {
return err
}

c.mockStdoutReader = reader
c.stdout = writer

return nil
}
}

// withStreamRequests sets the streamRequests channel in the executor client for Linux
// (primarily used for tests).
func withStreamRequests(s chan message.StreamRequest) Opt {
Expand Down
3 changes: 1 addition & 2 deletions executor/local/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bufio"
"context"
"fmt"
"os"
"time"

"github.com/go-vela/worker/internal/message"
Expand Down Expand Up @@ -125,7 +124,7 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err
// scan entire container output
for scanner.Scan() {
// ensure we output to stdout
fmt.Fprintln(os.Stdout, _pattern, scanner.Text())
fmt.Fprintln(c.stdout, _pattern, scanner.Text())
}

return scanner.Err()
Expand Down
7 changes: 3 additions & 4 deletions executor/local/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package local
import (
"context"
"fmt"
"os"
"sync"

"github.com/go-vela/types/pipeline"
Expand All @@ -23,7 +22,7 @@ func (c *client) CreateStage(ctx context.Context, s *pipeline.Stage) error {
_pattern := fmt.Sprintf(stagePattern, c.init.Name, c.init.Name)

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Preparing step images for stage", s.Name, "...")
fmt.Fprintln(c.stdout, _pattern, "> Preparing step images for stage", s.Name, "...")

// create the steps for the stage
for _, _step := range s.Steps {
Expand All @@ -43,7 +42,7 @@ func (c *client) CreateStage(ctx context.Context, s *pipeline.Stage) error {
}

// output the image information to stdout
fmt.Fprintln(os.Stdout, _pattern, string(image))
fmt.Fprintln(c.stdout, _pattern, string(image))
}

return nil
Expand Down Expand Up @@ -121,7 +120,7 @@ func (c *client) DestroyStage(ctx context.Context, s *pipeline.Stage) error {
// destroy the step
err = c.DestroyStep(ctx, _step)
if err != nil {
fmt.Fprintln(os.Stdout, "unable to destroy step: ", err)
fmt.Fprintln(c.stdout, "unable to destroy step: ", err)
}
}

Expand Down
3 changes: 1 addition & 2 deletions executor/local/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bufio"
"context"
"fmt"
"os"
"time"

"github.com/go-vela/types/constants"
Expand Down Expand Up @@ -164,7 +163,7 @@ func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error
// scan entire container output
for scanner.Scan() {
// ensure we output to stdout
fmt.Fprintln(os.Stdout, _pattern, scanner.Text())
fmt.Fprintln(c.stdout, _pattern, scanner.Text())
}

return scanner.Err()
Expand Down
4 changes: 4 additions & 0 deletions executor/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type Setup struct {

// Executor Configuration

// Mock should only be true for tests.
Mock bool

// specifies the executor driver to use
Driver string
// specifies the executor method used to publish logs
Expand Down Expand Up @@ -106,6 +109,7 @@ func (s *Setup) Local() (Engine, error) {
local.WithUser(s.User),
local.WithVelaClient(s.Client),
local.WithVersion(s.Version),
local.WithMockStdout(s.Mock),
)
}

Expand Down

0 comments on commit ceefc0f

Please sign in to comment.