Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance(local executor): print to stdout via client field #339

Merged
merged 6 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (w *Worker) exec(index int) error {
// https://godoc.org/github.com/go-vela/worker/executor#New
_executor, err := executor.New(&executor.Setup{
Logger: logger,
Mock: w.Config.Mock,
Driver: w.Config.Executor.Driver,
LogMethod: w.Config.Executor.LogMethod,
MaxLogSize: w.Config.Executor.MaxLogSize,
Expand Down
1 change: 1 addition & 0 deletions cmd/vela-worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type (

// Config represents the worker configuration.
Config struct {
Mock bool // Mock should only be true for tests
API *API
Build *Build
CheckIn time.Duration
Expand Down
3 changes: 1 addition & 2 deletions executor/local/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package local
import (
"context"
"fmt"
"os"
"time"

"github.com/go-vela/types/constants"
Expand Down Expand Up @@ -193,7 +192,7 @@ func (c *client) CancelBuild() (*library.Build, error) {

err = c.DestroyBuild(context.Background())
if err != nil {
fmt.Fprintln(os.Stdout, "unable to destroy build:", err)
fmt.Fprintln(c.stdout, "unable to destroy build:", err)
}

return b, nil
Expand Down
43 changes: 21 additions & 22 deletions executor/local/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package local
import (
"context"
"fmt"
"os"
"sync"
"time"

Expand Down Expand Up @@ -97,7 +96,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
}

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Inspecting runtime network...")
fmt.Fprintln(c.stdout, _pattern, "> Inspecting runtime network...")

// inspect the runtime network for the pipeline
network, err := c.Runtime.InspectNetwork(ctx, c.pipeline)
Expand All @@ -107,7 +106,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
}

// output the network information to stdout
fmt.Fprintln(os.Stdout, _pattern, string(network))
fmt.Fprintln(c.stdout, _pattern, string(network))

// create the runtime volume for the pipeline
err = c.Runtime.CreateVolume(ctx, c.pipeline)
Expand All @@ -117,7 +116,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
}

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Inspecting runtime volume...")
fmt.Fprintln(c.stdout, _pattern, "> Inspecting runtime volume...")

// inspect the runtime volume for the pipeline
volume, err := c.Runtime.InspectVolume(ctx, c.pipeline)
Expand All @@ -127,7 +126,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
}

// output the volume information to stdout
fmt.Fprintln(os.Stdout, _pattern, string(volume))
fmt.Fprintln(c.stdout, _pattern, string(volume))

return c.err
}
Expand Down Expand Up @@ -162,7 +161,7 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Preparing service images...")
fmt.Fprintln(c.stdout, _pattern, "> Preparing service images...")

// create the services for the pipeline
for _, _service := range c.pipeline.Services {
Expand All @@ -183,11 +182,11 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// output the image information to stdout
fmt.Fprintln(os.Stdout, _pattern, string(image))
fmt.Fprintln(c.stdout, _pattern, string(image))
}

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Preparing stage images...")
fmt.Fprintln(c.stdout, _pattern, "> Preparing stage images...")

// create the stages for the pipeline
for _, _stage := range c.pipeline.Stages {
Expand All @@ -206,7 +205,7 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Preparing step images...")
fmt.Fprintln(c.stdout, _pattern, "> Preparing step images...")

// create the steps for the pipeline
for _, _step := range c.pipeline.Steps {
Expand All @@ -229,11 +228,11 @@ func (c *client) AssembleBuild(ctx context.Context) error {
}

// output the image information to stdout
fmt.Fprintln(os.Stdout, _pattern, string(image))
fmt.Fprintln(c.stdout, _pattern, string(image))
}

// output a new line for readability to stdout
fmt.Fprintln(os.Stdout, "")
fmt.Fprintln(c.stdout, "")

// assemble runtime build just before any containers execute
c.err = c.Runtime.AssembleBuild(ctx, c.pipeline)
Expand Down Expand Up @@ -353,14 +352,14 @@ func (c *client) StreamBuild(ctx context.Context) error {
streams, streamCtx := errgroup.WithContext(ctx)

defer func() {
fmt.Fprintln(os.Stdout, "waiting for stream functions to return")
fmt.Fprintln(c.stdout, "waiting for stream functions to return")

err := streams.Wait()
if err != nil {
fmt.Fprintln(os.Stdout, "error in a stream request:", err)
fmt.Fprintln(c.stdout, "error in a stream request:", err)
}

fmt.Fprintln(os.Stdout, "all stream functions have returned")
fmt.Fprintln(c.stdout, "all stream functions have returned")
}()

// allow the runtime to do log/event streaming setup at build-level
Expand All @@ -374,11 +373,11 @@ func (c *client) StreamBuild(ctx context.Context) error {
select {
case req := <-c.streamRequests:
streams.Go(func() error {
fmt.Fprintf(os.Stdout, "[%s: %s] > Streaming container '%s'...\n", req.Key, req.Container.Name, req.Container.ID)
fmt.Fprintf(c.stdout, "[%s: %s] > Streaming container '%s'...\n", req.Key, req.Container.Name, req.Container.ID)

err := req.Stream(streamCtx, req.Container)
if err != nil {
fmt.Fprintln(os.Stdout, "error streaming:", err)
fmt.Fprintln(c.stdout, "error streaming:", err)
}

return nil
Expand All @@ -399,7 +398,7 @@ func (c *client) DestroyBuild(ctx context.Context) error {
err = c.Runtime.RemoveBuild(ctx, c.pipeline)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy runtime build:", err)
fmt.Fprintln(c.stdout, "unable to destroy runtime build:", err)
}
}()

Expand All @@ -414,7 +413,7 @@ func (c *client) DestroyBuild(ctx context.Context) error {
err = c.DestroyStep(ctx, _step)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy step:", err)
fmt.Fprintln(c.stdout, "unable to destroy step:", err)
}
}

Expand All @@ -429,7 +428,7 @@ func (c *client) DestroyBuild(ctx context.Context) error {
err = c.DestroyStage(ctx, _stage)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy stage:", err)
fmt.Fprintln(c.stdout, "unable to destroy stage:", err)
}
}

Expand All @@ -439,22 +438,22 @@ func (c *client) DestroyBuild(ctx context.Context) error {
err = c.DestroyService(ctx, _service)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy service:", err)
fmt.Fprintln(c.stdout, "unable to destroy service:", err)
}
}

// remove the runtime volume for the pipeline
err = c.Runtime.RemoveVolume(ctx, c.pipeline)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy runtime volume:", err)
fmt.Fprintln(c.stdout, "unable to destroy runtime volume:", err)
}

// remove the runtime network for the pipeline
err = c.Runtime.RemoveNetwork(ctx, c.pipeline)
if err != nil {
// output the error information to stdout
fmt.Fprintln(os.Stdout, "unable to destroy runtime network:", err)
fmt.Fprintln(c.stdout, "unable to destroy runtime network:", err)
}

return err
Expand Down
19 changes: 19 additions & 0 deletions executor/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package local

import (
"os"
"reflect"
"sync"

Expand Down Expand Up @@ -33,9 +34,24 @@ type (
user *library.User
err error
streamRequests chan message.StreamRequest

// internal field partially exported for tests
stdout *os.File
mockStdoutReader *os.File
}

// MockedClient is for internal use to facilitate testing the local executor.
MockedClient interface {
MockStdout() *os.File
}
)

// MockStdout is for internal use to facilitate testing the local executor.
// MockStdout returns a reader over a mocked Stdout.
func (c *client) MockStdout() *os.File {
return c.mockStdoutReader
}

// equal returns true if the other client is the equivalent.
func Equal(a, b *client) bool {
// handle any nil comparisons
Expand Down Expand Up @@ -64,6 +80,9 @@ func New(opts ...Opt) (*client, error) {
// create new local client
c := new(client)

// Add stdout by default
c.stdout = os.Stdout

// instantiate streamRequests channel (which may be overridden using withStreamRequests()).
c.streamRequests = make(chan message.StreamRequest)

Expand Down
23 changes: 23 additions & 0 deletions executor/local/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package local

import (
"fmt"
"os"

"github.com/go-vela/worker/internal/message"
"github.com/go-vela/worker/runtime"
Expand Down Expand Up @@ -121,6 +122,28 @@ func WithVersion(version string) Opt {
}
}

// WithMockStdout adds a mock stdout writer to the client if mock is true.
// If mock is true, then you must use a goroutine to read from
// MockStdout as quickly as possible, or writing to stdout will hang.
func WithMockStdout(mock bool) Opt {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this function should be exported (public)?

I could be missing something but it appears like it would only be used for testing purposes.

If so, I wonder if it should be unexported (private) like withStreamRequests()?

// withStreamRequests sets the streamRequests channel in the executor client for Linux
// (primarily used for tests).
func withStreamRequests(s chan message.StreamRequest) Opt {
return func(c *client) error {
// set the streamRequests channel in the client
c.streamRequests = s
return nil
}
}

Copy link
Member Author

@cognifloyd cognifloyd May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withStreamRequests is used by tests in the same package (executor.local). But, WithMockStdout is used in a different package (executor) for tests in yet another package (I want to write tests for the exec() function in package main under cmd/vela-worker)

worker/executor/setup.go

Lines 97 to 114 in 0a0f0bd

func (s *Setup) Local() (Engine, error) {
logrus.Trace("creating local executor client from setup")
// create new Local executor engine
//
// https://pkg.go.dev/github.com/go-vela/worker/executor/local?tab=doc#New
return local.New(
local.WithBuild(s.Build),
local.WithHostname(s.Hostname),
local.WithPipeline(s.Pipeline),
local.WithRepo(s.Repo),
local.WithRuntime(s.Runtime),
local.WithUser(s.User),
local.WithVelaClient(s.Client),
local.WithVersion(s.Version),
local.WithMockStdout(s.Mock),
)
}

So, I'm open to not exporting it, but I didn't see a good way to allow tests in other packages to trigger it.

return func(c *client) error {
if !mock {
return nil
}

// New() sets c.stdout = os.stdout, replace it if a mock is required.
reader, writer, err := os.Pipe()
if err != nil {
return err
}

c.mockStdoutReader = reader
c.stdout = writer

return nil
}
}

// withStreamRequests sets the streamRequests channel in the executor client for Linux
// (primarily used for tests).
func withStreamRequests(s chan message.StreamRequest) Opt {
Expand Down
36 changes: 36 additions & 0 deletions executor/local/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,39 @@ func TestLocal_Opt_WithVersion(t *testing.T) {
})
}
}

func TestLocal_Opt_WithMockStdout(t *testing.T) {
// setup tests
tests := []struct {
name string
mock bool
wantNil bool
}{
{
name: "standard",
mock: false,
wantNil: true,
},
{
name: "mocked",
mock: true,
wantNil: false,
},
}

// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := New(
WithMockStdout(test.mock),
)
if err != nil {
t.Errorf("unable to create local engine: %v", err)
}

if !reflect.DeepEqual(_engine.MockStdout() == nil, test.wantNil) {
t.Errorf("WithMockStdout is %v, wantNil = %v", _engine.MockStdout() == nil, test.wantNil)
}
})
}
}
3 changes: 1 addition & 2 deletions executor/local/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bufio"
"context"
"fmt"
"os"
"time"

"github.com/go-vela/worker/internal/message"
Expand Down Expand Up @@ -125,7 +124,7 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err
// scan entire container output
for scanner.Scan() {
// ensure we output to stdout
fmt.Fprintln(os.Stdout, _pattern, scanner.Text())
fmt.Fprintln(c.stdout, _pattern, scanner.Text())
}

return scanner.Err()
Expand Down
7 changes: 3 additions & 4 deletions executor/local/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package local
import (
"context"
"fmt"
"os"
"sync"

"github.com/go-vela/types/pipeline"
Expand All @@ -23,7 +22,7 @@ func (c *client) CreateStage(ctx context.Context, s *pipeline.Stage) error {
_pattern := fmt.Sprintf(stagePattern, c.init.Name, c.init.Name)

// output init progress to stdout
fmt.Fprintln(os.Stdout, _pattern, "> Preparing step images for stage", s.Name, "...")
fmt.Fprintln(c.stdout, _pattern, "> Preparing step images for stage", s.Name, "...")

// create the steps for the stage
for _, _step := range s.Steps {
Expand All @@ -43,7 +42,7 @@ func (c *client) CreateStage(ctx context.Context, s *pipeline.Stage) error {
}

// output the image information to stdout
fmt.Fprintln(os.Stdout, _pattern, string(image))
fmt.Fprintln(c.stdout, _pattern, string(image))
}

return nil
Expand Down Expand Up @@ -121,7 +120,7 @@ func (c *client) DestroyStage(ctx context.Context, s *pipeline.Stage) error {
// destroy the step
err = c.DestroyStep(ctx, _step)
if err != nil {
fmt.Fprintln(os.Stdout, "unable to destroy step: ", err)
fmt.Fprintln(c.stdout, "unable to destroy step: ", err)
}
}

Expand Down
3 changes: 1 addition & 2 deletions executor/local/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bufio"
"context"
"fmt"
"os"
"time"

"github.com/go-vela/types/constants"
Expand Down Expand Up @@ -164,7 +163,7 @@ func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error
// scan entire container output
for scanner.Scan() {
// ensure we output to stdout
fmt.Fprintln(os.Stdout, _pattern, scanner.Text())
fmt.Fprintln(c.stdout, _pattern, scanner.Text())
}

return scanner.Err()
Expand Down
Loading