From 151849034f86575046422c1407d33fc1aafa13b1 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 25 Apr 2022 22:33:20 +0000 Subject: [PATCH 1/2] fix(deps): update module github.com/urfave/cli/v2 to v2.4.8 (#320) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 8d01a079..11c45f57 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/opencontainers/image-spec v1.0.2 github.com/prometheus/client_golang v1.12.1 github.com/sirupsen/logrus v1.8.1 - github.com/urfave/cli/v2 v2.4.4 + github.com/urfave/cli/v2 v2.4.8 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c gotest.tools/v3 v3.2.0 k8s.io/api v0.23.6 diff --git a/go.sum b/go.sum index 6a394c7c..d28c346b 100644 --- a/go.sum +++ b/go.sum @@ -591,8 +591,8 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY github.com/ugorji/go/codec v1.1.11 h1:GaQDxjNe1J3vCZvlVaDjUIHIbFuUByFXY7rMqnhB5ck= github.com/ugorji/go/codec v1.1.11/go.mod h1:svMFxxx5FVQJPnJ9vbpAgscNufuiXDyldvzApI86qQo= github.com/urfave/cli/v2 v2.4.0/go.mod h1:NX9W0zmTvedE5oDoOMs2RTC8RvdK98NTYZE5LbaEYPg= -github.com/urfave/cli/v2 v2.4.4 h1:IvwT3XfI6RytTmIzC35UAu9oyK+bHgUPXDDZNqribkI= -github.com/urfave/cli/v2 v2.4.4/go.mod h1:oDzoM7pVwz6wHn5ogWgFUU1s4VJayeQS+aEZDqXIEJs= +github.com/urfave/cli/v2 v2.4.8 h1:9HuvvddU3oEJr1tJlwUVVsk3snVWMuKSpyAO+SzTNuI= +github.com/urfave/cli/v2 v2.4.8/go.mod h1:oDzoM7pVwz6wHn5ogWgFUU1s4VJayeQS+aEZDqXIEJs= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= From c0823e8902a6b6d7f96f3ea44a604c833878c4c8 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Thu, 28 Apr 2022 09:35:54 -0500 Subject: [PATCH 2/2] enhancement: Make context handling for log streaming more explicit (#317) --- cmd/vela-worker/exec.go | 15 +- executor/engine.go | 3 + executor/executor_test.go | 26 +++- executor/linux/build.go | 44 ++++++ executor/linux/build_test.go | 257 +++++++++++++++++++++++++++++++++ executor/linux/linux.go | 49 ++++++- executor/linux/linux_test.go | 88 +++++++++++ executor/linux/opts.go | 14 ++ executor/linux/opts_test.go | 51 +++++++ executor/linux/secret.go | 17 ++- executor/linux/secret_test.go | 5 + executor/linux/service.go | 23 +-- executor/linux/service_test.go | 5 + executor/linux/stage_test.go | 5 + executor/linux/step.go | 23 +-- executor/linux/step_test.go | 5 + executor/local/build.go | 39 +++++ executor/local/build_test.go | 226 ++++++++++++++++++++++++++++- executor/local/local.go | 43 +++++- executor/local/local_test.go | 88 +++++++++++ executor/local/opts.go | 12 ++ executor/local/service.go | 14 +- executor/local/service_test.go | 5 + executor/local/stage_test.go | 5 + executor/local/step.go | 15 +- executor/local/step_test.go | 5 + executor/setup_test.go | 14 +- internal/message/doc.go | 11 ++ internal/message/stream.go | 44 ++++++ 29 files changed, 1065 insertions(+), 86 deletions(-) create mode 100644 internal/message/doc.go create mode 100644 internal/message/stream.go diff --git a/cmd/vela-worker/exec.go b/cmd/vela-worker/exec.go index 984e31dd..2fa746b0 100644 --- a/cmd/vela-worker/exec.go +++ b/cmd/vela-worker/exec.go @@ -94,11 +94,12 @@ func (w *Worker) exec(index int) error { } // create a background context - ctx := context.Background() + buildCtx, done := context.WithCancel(context.Background()) + defer done() // add to the background context with a timeout // built in for ensuring a build doesn't run forever - ctx, timeout := context.WithTimeout(ctx, t) + ctx, timeout := context.WithTimeout(buildCtx, t) defer timeout() defer func() { @@ -128,6 +129,16 @@ func (w *Worker) exec(index int) error { return nil } + // log streaming uses buildCtx so that it is not subject to the timeout. + go func() { + logger.Info("streaming build logs") + // execute the build with the executor + err = _executor.StreamBuild(buildCtx) + if err != nil { + logger.Errorf("unable to stream build logs: %v", err) + } + }() + logger.Info("assembling build") // assemble the build with the executor err = _executor.AssembleBuild(ctx) diff --git a/executor/engine.go b/executor/engine.go index b3fba545..7f2f9048 100644 --- a/executor/engine.go +++ b/executor/engine.go @@ -53,6 +53,9 @@ type Engine interface { // ExecBuild defines a function that // runs a pipeline for a build. ExecBuild(context.Context) error + // StreamBuild defines a function that receives a StreamRequest + // and then runs StreamService or StreamStep in a goroutine. + StreamBuild(context.Context) error // DestroyBuild defines a function that // cleans up the build after execution. DestroyBuild(context.Context) error diff --git a/executor/executor_test.go b/executor/executor_test.go index e69c2cea..fa2537ac 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/gin-gonic/gin" + "github.com/google/go-cmp/cmp" "github.com/go-vela/server/mock/server" @@ -77,6 +78,7 @@ func TestExecutor_New(t *testing.T) { failure bool setup *Setup want Engine + equal interface{} }{ { name: "driver-darwin", @@ -91,7 +93,8 @@ func TestExecutor_New(t *testing.T) { User: _user, Version: "v1.0.0", }, - want: nil, + want: nil, + equal: reflect.DeepEqual, }, { name: "driver-linux", @@ -108,7 +111,8 @@ func TestExecutor_New(t *testing.T) { User: _user, Version: "v1.0.0", }, - want: _linux, + want: _linux, + equal: linux.Equal, }, { name: "driver-local", @@ -123,7 +127,8 @@ func TestExecutor_New(t *testing.T) { User: _user, Version: "v1.0.0", }, - want: _local, + want: _local, + equal: local.Equal, }, { name: "driver-windows", @@ -138,7 +143,8 @@ func TestExecutor_New(t *testing.T) { User: _user, Version: "v1.0.0", }, - want: nil, + want: nil, + equal: reflect.DeepEqual, }, { name: "driver-invalid", @@ -153,7 +159,8 @@ func TestExecutor_New(t *testing.T) { User: _user, Version: "v1.0.0", }, - want: nil, + want: nil, + equal: reflect.DeepEqual, }, { name: "driver-empty", @@ -168,7 +175,8 @@ func TestExecutor_New(t *testing.T) { User: _user, Version: "v1.0.0", }, - want: nil, + want: nil, + equal: reflect.DeepEqual, }, } @@ -193,8 +201,10 @@ func TestExecutor_New(t *testing.T) { t.Errorf("New returned err: %v", err) } - if !reflect.DeepEqual(got, test.want) { - t.Errorf("New is %v, want %v", got, test.want) + // Comparing with reflect.DeepEqual(x, y interface) panics due to the + // unexported streamRequests channel. + if diff := cmp.Diff(test.want, got, cmp.Comparer(test.equal)); diff != "" { + t.Errorf("engine mismatch (-want +got):\n%v", diff) } }) } diff --git a/executor/linux/build.go b/executor/linux/build.go index 2b5ba4a1..b488aae0 100644 --- a/executor/linux/build.go +++ b/executor/linux/build.go @@ -491,6 +491,50 @@ func (c *client) ExecBuild(ctx context.Context) error { return c.err } +// StreamBuild receives a StreamRequest and then +// runs StreamService or StreamStep in a goroutine. +func (c *client) StreamBuild(ctx context.Context) error { + // create an error group with the parent context + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext + streams, streamCtx := errgroup.WithContext(ctx) + + defer func() { + c.Logger.Trace("waiting for stream functions to return") + + err := streams.Wait() + if err != nil { + c.Logger.Errorf("error in a stream request, %v", err) + } + + c.Logger.Info("all stream functions have returned") + }() + + for { + select { + case req := <-c.streamRequests: + streams.Go(func() error { + // update engine logger with step metadata + // + // https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField + logger := c.Logger.WithField(req.Key, req.Container.Name) + + logger.Debugf("streaming %s container %s", req.Key, req.Container.ID) + + err := req.Stream(streamCtx, req.Container) + if err != nil { + logger.Error(err) + } + + return nil + }) + case <-ctx.Done(): + // build done or canceled + return nil + } + } +} + // DestroyBuild cleans up the build after execution. func (c *client) DestroyBuild(ctx context.Context) error { var err error diff --git a/executor/linux/build_test.go b/executor/linux/build_test.go index 5c9a742f..a591b7d4 100644 --- a/executor/linux/build_test.go +++ b/executor/linux/build_test.go @@ -9,16 +9,19 @@ import ( "flag" "net/http/httptest" "testing" + "time" "github.com/go-vela/server/compiler/native" "github.com/go-vela/server/mock/server" "github.com/urfave/cli/v2" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime/docker" "github.com/go-vela/sdk-go/vela" "github.com/go-vela/types/library" + "github.com/go-vela/types/pipeline" "github.com/gin-gonic/gin" ) @@ -247,6 +250,9 @@ func TestLinux_AssembleBuild(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + tests := []struct { name string failure bool @@ -334,6 +340,7 @@ func TestLinux_AssembleBuild(t *testing.T) { WithRuntime(_runtime), WithUser(_user), WithVelaClient(_client), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) @@ -385,6 +392,9 @@ func TestLinux_ExecBuild(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + tests := []struct { name string failure bool @@ -442,6 +452,7 @@ func TestLinux_ExecBuild(t *testing.T) { WithRuntime(_runtime), WithUser(_user), WithVelaClient(_client), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) @@ -504,6 +515,252 @@ func TestLinux_ExecBuild(t *testing.T) { } } +func TestLinux_StreamBuild(t *testing.T) { + // setup types + compiler, _ := native.New(cli.NewContext(nil, flag.NewFlagSet("test", 0), nil)) + + _build := testBuild() + _repo := testRepo() + _user := testUser() + _metadata := testMetadata() + + gin.SetMode(gin.TestMode) + + s := httptest.NewServer(server.FakeHandler()) + + _client, err := vela.NewClient(s.URL, "", nil) + if err != nil { + t.Errorf("unable to create Vela API client: %v", err) + } + + _runtime, err := docker.NewMock() + if err != nil { + t.Errorf("unable to create runtime engine: %v", err) + } + + type planFuncType = func(context.Context, *pipeline.Container) error + + // planNothing is a planFuncType that does nothing + planNothing := func(ctx context.Context, container *pipeline.Container) error { + return nil + } + + tests := []struct { + name string + failure bool + pipeline string + messageKey string + ctn *pipeline.Container + streamFunc func(*client) message.StreamFunc + planFunc func(*client) planFuncType + }{ + { + name: "basic services pipeline", + failure: false, + pipeline: "testdata/build/services/basic.yml", + messageKey: "service", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamService + }, + planFunc: func(c *client) planFuncType { + return c.PlanService + }, + ctn: &pipeline.Container{ + ID: "service_github_octocat_1_postgres", + Detach: true, + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "postgres:latest", + Name: "postgres", + Number: 1, + Ports: []string{"5432:5432"}, + Pull: "not_present", + }, + }, + { + name: "basic services pipeline with StreamService failure", + failure: false, + pipeline: "testdata/build/services/basic.yml", + messageKey: "service", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamService + }, + planFunc: func(c *client) planFuncType { + // simulate failure to call PlanService + return planNothing + }, + ctn: &pipeline.Container{ + ID: "service_github_octocat_1_postgres", + Detach: true, + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "postgres:latest", + Name: "postgres", + Number: 1, + Ports: []string{"5432:5432"}, + Pull: "not_present", + }, + }, + { + name: "basic steps pipeline", + failure: false, + pipeline: "testdata/build/steps/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + return c.PlanStep + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, + { + name: "basic steps pipeline with StreamStep failure", + failure: false, + pipeline: "testdata/build/steps/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + // simulate failure to call PlanStep + return planNothing + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, + { + name: "basic stages pipeline", + failure: false, + pipeline: "testdata/build/stages/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + return c.PlanStep + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, + { + name: "basic secrets pipeline", + failure: false, + pipeline: "testdata/build/secrets/basic.yml", + messageKey: "secret", + streamFunc: func(c *client) message.StreamFunc { + return c.secret.stream + }, + planFunc: func(c *client) planFuncType { + // no plan function equivalent for secret containers + return planNothing + }, + ctn: &pipeline.Container{ + ID: "secret_github_octocat_1_vault", + Directory: "/vela/src/vcs.company.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "target/secret-vault:latest", + Name: "vault", + Number: 1, + Pull: "not_present", + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + buildCtx, done := context.WithCancel(context.Background()) + defer done() + + streamRequests := make(chan message.StreamRequest) + + _pipeline, err := compiler. + WithBuild(_build). + WithRepo(_repo). + WithMetadata(_metadata). + WithUser(_user). + Compile(test.pipeline) + if err != nil { + t.Errorf("unable to compile pipeline %s: %v", test.pipeline, err) + } + + _engine, err := New( + WithBuild(_build), + WithPipeline(_pipeline), + WithRepo(_repo), + WithRuntime(_runtime), + WithUser(_user), + WithVelaClient(_client), + withStreamRequests(streamRequests), + ) + if err != nil { + t.Errorf("unable to create executor engine: %v", err) + } + + // run create to init steps to be created properly + err = _engine.CreateBuild(buildCtx) + if err != nil { + t.Errorf("unable to create build: %v", err) + } + + // simulate ExecBuild() which runs concurrently with StreamBuild() + go func() { + // ExecBuild calls PlanService()/PlanStep() before ExecService()/ExecStep() + // (ExecStage() calls PlanStep() before ExecStep()). + _engine.err = test.planFunc(_engine)(buildCtx, test.ctn) + + // ExecService()/ExecStep()/secret.exec() send this message + streamRequests <- message.StreamRequest{ + Key: test.messageKey, + Stream: test.streamFunc(_engine), + Container: test.ctn, + } + + // simulate exec build duration + time.Sleep(100 * time.Microsecond) + + // signal the end of the build so StreamBuild can terminate + done() + }() + + err = _engine.StreamBuild(buildCtx) + + if test.failure { + if err == nil { + t.Errorf("StreamBuild for %s should have returned err", test.pipeline) + } + + return // continue to next test + } + + if err != nil { + t.Errorf("StreamBuild for %s returned err: %v", test.pipeline, err) + } + }) + } +} + func TestLinux_DestroyBuild(t *testing.T) { // setup types compiler, _ := native.New(cli.NewContext(nil, flag.NewFlagSet("test", 0), nil)) diff --git a/executor/linux/linux.go b/executor/linux/linux.go index 71ccd041..fe388cd0 100644 --- a/executor/linux/linux.go +++ b/executor/linux/linux.go @@ -5,11 +5,13 @@ package linux import ( + "reflect" "sync" "github.com/go-vela/sdk-go/vela" "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime" "github.com/sirupsen/logrus" ) @@ -29,19 +31,20 @@ type ( secret *secretSvc // private fields - init *pipeline.Container - logMethod string - maxLogSize uint - build *library.Build - pipeline *pipeline.Build - repo *library.Repo - // nolint: structcheck,unused // ignore false positives + init *pipeline.Container + logMethod string + maxLogSize uint + build *library.Build + pipeline *pipeline.Build + repo *library.Repo secrets sync.Map services sync.Map serviceLogs sync.Map steps sync.Map stepLogs sync.Map + streamRequests chan message.StreamRequest + user *library.User err error } @@ -52,6 +55,35 @@ type ( } ) +// Equal returns true if the other client is the equivalent. +func Equal(a, b *client) bool { + // handle any nil comparisons + if a == nil || b == nil { + return a == nil && b == nil + } + + return reflect.DeepEqual(a.Logger, b.Logger) && + reflect.DeepEqual(a.Vela, b.Vela) && + reflect.DeepEqual(a.Runtime, b.Runtime) && + reflect.DeepEqual(a.Secrets, b.Secrets) && + a.Hostname == b.Hostname && + a.Version == b.Version && + reflect.DeepEqual(a.init, b.init) && + a.logMethod == b.logMethod && + a.maxLogSize == b.maxLogSize && + reflect.DeepEqual(a.build, b.build) && + reflect.DeepEqual(a.pipeline, b.pipeline) && + reflect.DeepEqual(a.repo, b.repo) && + reflect.DeepEqual(&a.secrets, &b.secrets) && + reflect.DeepEqual(&a.services, &b.services) && + reflect.DeepEqual(&a.serviceLogs, &b.serviceLogs) && + reflect.DeepEqual(&a.steps, &b.steps) && + reflect.DeepEqual(&a.stepLogs, &b.stepLogs) && + // do not compare streamRequests channel + reflect.DeepEqual(a.user, b.user) && + reflect.DeepEqual(a.err, b.err) +} + // New returns an Executor implementation that integrates with a Linux instance. // // nolint: revive // ignore unexported type as it is intentional @@ -69,6 +101,9 @@ func New(opts ...Opt) (*client, error) { // https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#NewEntry c.Logger = logrus.NewEntry(logger) + // instantiate streamRequests channel (which may be overridden using withStreamRequests()). + c.streamRequests = make(chan message.StreamRequest) + // apply all provided configuration options for _, opt := range opts { err := opt(c) diff --git a/executor/linux/linux_test.go b/executor/linux/linux_test.go index 82911867..46abe49b 100644 --- a/executor/linux/linux_test.go +++ b/executor/linux/linux_test.go @@ -21,6 +21,94 @@ import ( "github.com/go-vela/types/pipeline" ) +func TestEqual(t *testing.T) { + // setup types + gin.SetMode(gin.TestMode) + + s := httptest.NewServer(server.FakeHandler()) + + _client, err := vela.NewClient(s.URL, "", nil) + if err != nil { + t.Errorf("unable to create Vela API client: %v", err) + } + + _runtime, err := docker.NewMock() + if err != nil { + t.Errorf("unable to create runtime engine: %v", err) + } + + _linux, err := New( + WithBuild(testBuild()), + WithHostname("localhost"), + WithPipeline(testSteps()), + WithRepo(testRepo()), + WithRuntime(_runtime), + WithUser(testUser()), + WithVelaClient(_client), + ) + if err != nil { + t.Errorf("unable to create linux executor: %v", err) + } + + _alternate, err := New( + WithBuild(testBuild()), + WithHostname("a.different.host"), + WithPipeline(testSteps()), + WithRepo(testRepo()), + WithRuntime(_runtime), + WithUser(testUser()), + WithVelaClient(_client), + ) + if err != nil { + t.Errorf("unable to create alternate local executor: %v", err) + } + + tests := []struct { + name string + a *client + b *client + want bool + }{ + { + name: "both nil", + a: nil, + b: nil, + want: true, + }, + { + name: "left nil", + a: nil, + b: _linux, + want: false, + }, + { + name: "right nil", + a: _linux, + b: nil, + want: false, + }, + { + name: "equal", + a: _linux, + b: _linux, + want: true, + }, + { + name: "not equal", + a: _linux, + b: _alternate, + want: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if got := Equal(test.a, test.b); got != test.want { + t.Errorf("Equal() = %v, want %v", got, test.want) + } + }) + } +} + func TestLinux_New(t *testing.T) { // setup types gin.SetMode(gin.TestMode) diff --git a/executor/linux/opts.go b/executor/linux/opts.go index 04f23146..4095a055 100644 --- a/executor/linux/opts.go +++ b/executor/linux/opts.go @@ -10,6 +10,7 @@ import ( "github.com/go-vela/sdk-go/vela" "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime" "github.com/sirupsen/logrus" ) @@ -198,3 +199,16 @@ func WithVersion(version string) Opt { return nil } } + +// withStreamRequests sets the streamRequests channel in the executor client for Linux +// (primarily used for tests). +func withStreamRequests(s chan message.StreamRequest) Opt { + return func(c *client) error { + c.Logger.Trace("configuring stream requests in linux executor client") + + // set the streamRequests channel in the client + c.streamRequests = s + + return nil + } +} diff --git a/executor/linux/opts_test.go b/executor/linux/opts_test.go index 9e4ed641..ac49f04b 100644 --- a/executor/linux/opts_test.go +++ b/executor/linux/opts_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" "github.com/go-vela/server/mock/server" @@ -159,6 +160,7 @@ func TestLinux_Opt_WithMaxLogSize(t *testing.T) { }) } } + func TestLinux_Opt_WithHostname(t *testing.T) { // setup tests tests := []struct { @@ -195,6 +197,55 @@ func TestLinux_Opt_WithHostname(t *testing.T) { } } +func TestLinux_Opt_WithLogger(t *testing.T) { + // setup tests + tests := []struct { + name string + failure bool + logger *logrus.Entry + }{ + { + name: "provided logger", + failure: false, + logger: &logrus.Entry{}, + }, + { + name: "nil logger", + failure: false, + logger: nil, + }, + } + + // run tests + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _engine, err := New( + WithLogger(test.logger), + ) + + if test.failure { + if err == nil { + t.Errorf("WithLogger should have returned err") + } + + return // continue to next test + } + + if err != nil { + t.Errorf("WithLogger returned err: %v", err) + } + + if test.logger == nil && _engine.Logger == nil { + t.Errorf("_engine.Logger should not be nil even if nil is passed to WithLogger") + } + + if test.logger != nil && !reflect.DeepEqual(_engine.Logger, test.logger) { + t.Errorf("WithLogger set %v, want %v", _engine.Logger, test.logger) + } + }) + } +} + func TestLinux_Opt_WithPipeline(t *testing.T) { // setup types _steps := testSteps() diff --git a/executor/linux/secret.go b/executor/linux/secret.go index ad2c2b25..6417070d 100644 --- a/executor/linux/secret.go +++ b/executor/linux/secret.go @@ -16,6 +16,7 @@ import ( "github.com/go-vela/types/constants" "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/internal/step" "github.com/sirupsen/logrus" @@ -135,14 +136,12 @@ func (s *secretSvc) exec(ctx context.Context, p *pipeline.SecretSlice) error { return err } - go func() { - logger.Debug("stream logs for container") - // stream logs from container - err = s.client.secret.stream(ctx, _secret.Origin) - if err != nil { - logger.Error(err) - } - }() + // trigger StreamStep goroutine with logging context + s.client.streamRequests <- message.StreamRequest{ + Key: "secret", + Stream: s.stream, + Container: _secret.Origin, + } logger.Debug("waiting for container") // wait for the runtime container @@ -325,6 +324,8 @@ func (s *secretSvc) stream(ctx context.Context, ctn *pipeline.Container) error { } } + logger.Info("finished streaming logs") + return scanner.Err() } diff --git a/executor/linux/secret_test.go b/executor/linux/secret_test.go index 24427f8c..b0643661 100644 --- a/executor/linux/secret_test.go +++ b/executor/linux/secret_test.go @@ -17,6 +17,7 @@ import ( "github.com/go-vela/server/compiler/native" "github.com/go-vela/server/mock/server" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime/docker" "github.com/go-vela/sdk-go/vela" @@ -265,6 +266,9 @@ func TestLinux_Secret_exec(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + // setup tests tests := []struct { name string @@ -302,6 +306,7 @@ func TestLinux_Secret_exec(t *testing.T) { WithRuntime(_runtime), WithUser(_user), WithVelaClient(_client), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) diff --git a/executor/linux/service.go b/executor/linux/service.go index 66fa3309..b20cc22e 100644 --- a/executor/linux/service.go +++ b/executor/linux/service.go @@ -15,8 +15,8 @@ import ( "github.com/go-vela/types/constants" "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/internal/service" - "golang.org/x/sync/errgroup" ) // CreateService configures the service for execution. @@ -143,21 +143,12 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error return err } - // create an error group with the parent context - // - // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext - logs, logCtx := errgroup.WithContext(ctx) - - logs.Go(func() error { - logger.Debug("streaming logs for container") - // stream logs from container - err := c.StreamService(logCtx, ctn) - if err != nil { - logger.Error(err) - } - - return nil - }) + // trigger StreamService goroutine with logging context + c.streamRequests <- message.StreamRequest{ + Key: "service", + Stream: c.StreamService, + Container: ctn, + } return nil } diff --git a/executor/linux/service_test.go b/executor/linux/service_test.go index 70ca8aec..c95a488c 100644 --- a/executor/linux/service_test.go +++ b/executor/linux/service_test.go @@ -13,6 +13,7 @@ import ( "github.com/go-vela/server/mock/server" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime/docker" "github.com/go-vela/sdk-go/vela" @@ -231,6 +232,9 @@ func TestLinux_ExecService(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + // setup tests tests := []struct { name string @@ -284,6 +288,7 @@ func TestLinux_ExecService(t *testing.T) { WithRuntime(_runtime), WithUser(_user), WithVelaClient(_client), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) diff --git a/executor/linux/stage_test.go b/executor/linux/stage_test.go index 0b2ad832..42eeff12 100644 --- a/executor/linux/stage_test.go +++ b/executor/linux/stage_test.go @@ -18,6 +18,7 @@ import ( "github.com/go-vela/server/compiler/native" "github.com/go-vela/server/mock/server" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime/docker" "github.com/go-vela/sdk-go/vela" @@ -302,6 +303,9 @@ func TestLinux_ExecStage(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + // setup tests tests := []struct { name string @@ -377,6 +381,7 @@ func TestLinux_ExecStage(t *testing.T) { WithRuntime(_runtime), WithUser(_user), WithVelaClient(_client), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) diff --git a/executor/linux/step.go b/executor/linux/step.go index f823aede..49c755e1 100644 --- a/executor/linux/step.go +++ b/executor/linux/step.go @@ -16,8 +16,8 @@ import ( "github.com/go-vela/types/constants" "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/internal/step" - "golang.org/x/sync/errgroup" ) // CreateStep configures the step for execution. @@ -155,21 +155,12 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error { return err } - // create an error group with the parent context - // - // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext - logs, logCtx := errgroup.WithContext(ctx) - - logs.Go(func() error { - logger.Debug("streaming logs for container") - // stream logs from container - err := c.StreamStep(logCtx, ctn) - if err != nil { - logger.Error(err) - } - - return nil - }) + // trigger StreamStep goroutine with logging context + c.streamRequests <- message.StreamRequest{ + Key: "step", + Stream: c.StreamStep, + Container: ctn, + } // do not wait for detached containers if ctn.Detach { diff --git a/executor/linux/step_test.go b/executor/linux/step_test.go index 4012238c..078dce96 100644 --- a/executor/linux/step_test.go +++ b/executor/linux/step_test.go @@ -15,6 +15,7 @@ import ( "github.com/go-vela/server/mock/server" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime/docker" "github.com/go-vela/sdk-go/vela" @@ -238,6 +239,9 @@ func TestLinux_ExecStep(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + // setup tests tests := []struct { name string @@ -314,6 +318,7 @@ func TestLinux_ExecStep(t *testing.T) { WithRuntime(_runtime), WithUser(_user), WithVelaClient(_client), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) diff --git a/executor/local/build.go b/executor/local/build.go index 21c3efaf..c079787b 100644 --- a/executor/local/build.go +++ b/executor/local/build.go @@ -344,6 +344,45 @@ func (c *client) ExecBuild(ctx context.Context) error { return c.err } +// StreamBuild receives a StreamRequest and then +// runs StreamService or StreamStep in a goroutine. +func (c *client) StreamBuild(ctx context.Context) error { + // create an error group with the parent context + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext + streams, streamCtx := errgroup.WithContext(ctx) + + defer func() { + fmt.Fprintln(os.Stdout, "waiting for stream functions to return") + + err := streams.Wait() + if err != nil { + fmt.Fprintln(os.Stdout, "error in a stream request:", err) + } + + fmt.Fprintln(os.Stdout, "all stream functions have returned") + }() + + for { + select { + case req := <-c.streamRequests: + streams.Go(func() error { + fmt.Fprintf(os.Stdout, "streaming %s container %s", req.Key, req.Container.ID) + + err := req.Stream(streamCtx, req.Container) + if err != nil { + fmt.Fprintln(os.Stdout, "error streaming:", err) + } + + return nil + }) + case <-ctx.Done(): + // build done or canceled + return nil + } + } +} + // DestroyBuild cleans up the build after execution. func (c *client) DestroyBuild(ctx context.Context) error { var err error diff --git a/executor/local/build_test.go b/executor/local/build_test.go index fa95675c..35a2ada1 100644 --- a/executor/local/build_test.go +++ b/executor/local/build_test.go @@ -8,10 +8,13 @@ import ( "context" "flag" "testing" + "time" - "github.com/go-vela/server/compiler/native" "github.com/urfave/cli/v2" + "github.com/go-vela/server/compiler/native" + "github.com/go-vela/types/pipeline" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime/docker" ) @@ -186,6 +189,9 @@ func TestLocal_AssembleBuild(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + tests := []struct { name string failure bool @@ -257,6 +263,7 @@ func TestLocal_AssembleBuild(t *testing.T) { WithRepo(_repo), WithRuntime(_runtime), WithUser(_user), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) @@ -298,6 +305,9 @@ func TestLocal_ExecBuild(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + tests := []struct { name string failure bool @@ -354,6 +364,7 @@ func TestLocal_ExecBuild(t *testing.T) { WithRepo(_repo), WithRuntime(_runtime), WithUser(_user), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) @@ -382,6 +393,219 @@ func TestLocal_ExecBuild(t *testing.T) { } } +func TestLocal_StreamBuild(t *testing.T) { + // setup types + compiler, _ := native.New(cli.NewContext(nil, flag.NewFlagSet("test", 0), nil)) + + _build := testBuild() + _repo := testRepo() + _user := testUser() + + _runtime, err := docker.NewMock() + if err != nil { + t.Errorf("unable to create runtime engine: %v", err) + } + + type planFuncType = func(context.Context, *pipeline.Container) error + + // planNothing is a planFuncType that does nothing + planNothing := func(ctx context.Context, container *pipeline.Container) error { + return nil + } + + tests := []struct { + name string + failure bool + pipeline string + messageKey string + ctn *pipeline.Container + streamFunc func(*client) message.StreamFunc + planFunc func(*client) planFuncType + }{ + { + name: "basic services pipeline", + failure: false, + pipeline: "testdata/build/services/basic.yml", + messageKey: "service", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamService + }, + planFunc: func(c *client) planFuncType { + return c.PlanService + }, + ctn: &pipeline.Container{ + ID: "service_github_octocat_1_postgres", + Detach: true, + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "postgres:latest", + Name: "postgres", + Number: 1, + Ports: []string{"5432:5432"}, + Pull: "not_present", + }, + }, + { + name: "basic services pipeline with StreamService failure", + failure: false, + pipeline: "testdata/build/services/basic.yml", + messageKey: "service", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamService + }, + planFunc: func(c *client) planFuncType { + // simulate failure to call PlanService + return planNothing + }, + ctn: &pipeline.Container{ + ID: "service_github_octocat_1_postgres", + Detach: true, + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "postgres:latest", + Name: "postgres", + Number: 1, + Ports: []string{"5432:5432"}, + Pull: "not_present", + }, + }, + { + name: "basic steps pipeline", + failure: false, + pipeline: "testdata/build/steps/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + return c.PlanStep + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, + { + name: "basic steps pipeline with StreamStep failure", + failure: false, + pipeline: "testdata/build/steps/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + // simulate failure to call PlanStep + return planNothing + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, + { + name: "basic stages pipeline", + failure: false, + pipeline: "testdata/build/stages/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + return c.PlanStep + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + buildCtx, done := context.WithCancel(context.Background()) + defer done() + + streamRequests := make(chan message.StreamRequest) + + _pipeline, err := compiler. + WithBuild(_build). + WithRepo(_repo). + WithLocal(true). + WithUser(_user). + Compile(test.pipeline) + if err != nil { + t.Errorf("unable to compile pipeline %s: %v", test.pipeline, err) + } + + _engine, err := New( + WithBuild(_build), + WithPipeline(_pipeline), + WithRepo(_repo), + WithRuntime(_runtime), + WithUser(_user), + withStreamRequests(streamRequests), + ) + if err != nil { + t.Errorf("unable to create executor engine: %v", err) + } + + // run create to init steps to be created properly + err = _engine.CreateBuild(buildCtx) + if err != nil { + t.Errorf("unable to create build: %v", err) + } + + // simulate ExecBuild() + go func() { + // ExecBuild calls PlanService()/PlanStep() before ExecService()/ExecStep() + // (ExecStage() calls PlanStep() before ExecStep()). + _engine.err = test.planFunc(_engine)(buildCtx, test.ctn) + + // ExecService()/ExecStep()/secret.exec() send this message + streamRequests <- message.StreamRequest{ + Key: test.messageKey, + Stream: test.streamFunc(_engine), + Container: test.ctn, + } + + // simulate exec build duration + time.Sleep(100 * time.Microsecond) + + // signal the end of the build so StreamBuild can terminate + done() + }() + + err = _engine.StreamBuild(buildCtx) + + if test.failure { + if err == nil { + t.Errorf("StreamBuild for %s should have returned err", test.pipeline) + } + + return // continue to next test + } + + if err != nil { + t.Errorf("StreamBuild for %s returned err: %v", test.pipeline, err) + } + }) + } +} + func TestLocal_DestroyBuild(t *testing.T) { // setup types compiler, _ := native.New(cli.NewContext(nil, flag.NewFlagSet("test", 0), nil)) diff --git a/executor/local/local.go b/executor/local/local.go index 6674931a..5fb45c3e 100644 --- a/executor/local/local.go +++ b/executor/local/local.go @@ -5,11 +5,13 @@ package local import ( + "reflect" "sync" "github.com/go-vela/sdk-go/vela" "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime" ) @@ -22,17 +24,39 @@ type ( Version string // private fields - init *pipeline.Container - build *library.Build - pipeline *pipeline.Build - repo *library.Repo - services sync.Map - steps sync.Map - user *library.User - err error + init *pipeline.Container + build *library.Build + pipeline *pipeline.Build + repo *library.Repo + services sync.Map + steps sync.Map + user *library.User + err error + streamRequests chan message.StreamRequest } ) +// equal returns true if the other client is the equivalent. +func Equal(a, b *client) bool { + // handle any nil comparisons + if a == nil || b == nil { + return a == nil && b == nil + } + + return reflect.DeepEqual(a.Vela, b.Vela) && + reflect.DeepEqual(a.Runtime, b.Runtime) && + a.Hostname == b.Hostname && + a.Version == b.Version && + reflect.DeepEqual(a.init, b.init) && + reflect.DeepEqual(a.build, b.build) && + reflect.DeepEqual(a.pipeline, b.pipeline) && + reflect.DeepEqual(a.repo, b.repo) && + reflect.DeepEqual(&a.services, &b.services) && + reflect.DeepEqual(&a.steps, &b.steps) && + reflect.DeepEqual(a.user, b.user) && + reflect.DeepEqual(a.err, b.err) +} + // New returns an Executor implementation that integrates with the local system. // // nolint: revive // ignore unexported type as it is intentional @@ -40,6 +64,9 @@ func New(opts ...Opt) (*client, error) { // create new local client c := new(client) + // instantiate streamRequests channel (which may be overridden using withStreamRequests()). + c.streamRequests = make(chan message.StreamRequest) + // apply all provided configuration options for _, opt := range opts { err := opt(c) diff --git a/executor/local/local_test.go b/executor/local/local_test.go index 083a0ca3..113f4e70 100644 --- a/executor/local/local_test.go +++ b/executor/local/local_test.go @@ -20,6 +20,94 @@ import ( "github.com/go-vela/types/pipeline" ) +func TestEqual(t *testing.T) { + // setup types + gin.SetMode(gin.TestMode) + + s := httptest.NewServer(server.FakeHandler()) + + _client, err := vela.NewClient(s.URL, "", nil) + if err != nil { + t.Errorf("unable to create Vela API client: %v", err) + } + + _runtime, err := docker.NewMock() + if err != nil { + t.Errorf("unable to create runtime engine: %v", err) + } + + _local, err := New( + WithBuild(testBuild()), + WithHostname("localhost"), + WithPipeline(testSteps()), + WithRepo(testRepo()), + WithRuntime(_runtime), + WithUser(testUser()), + WithVelaClient(_client), + ) + if err != nil { + t.Errorf("unable to create local executor: %v", err) + } + + _alternate, err := New( + WithBuild(testBuild()), + WithHostname("a.different.host"), + WithPipeline(testSteps()), + WithRepo(testRepo()), + WithRuntime(_runtime), + WithUser(testUser()), + WithVelaClient(_client), + ) + if err != nil { + t.Errorf("unable to create alternate local executor: %v", err) + } + + tests := []struct { + name string + a *client + b *client + want bool + }{ + { + name: "both nil", + a: nil, + b: nil, + want: true, + }, + { + name: "left nil", + a: nil, + b: _local, + want: false, + }, + { + name: "right nil", + a: _local, + b: nil, + want: false, + }, + { + name: "equal", + a: _local, + b: _local, + want: true, + }, + { + name: "not equal", + a: _local, + b: _alternate, + want: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if got := Equal(test.a, test.b); got != test.want { + t.Errorf("Equal() = %v, want %v", got, test.want) + } + }) + } +} + func TestLocal_New(t *testing.T) { // setup types gin.SetMode(gin.TestMode) diff --git a/executor/local/opts.go b/executor/local/opts.go index 6b7a489a..d4b0ba55 100644 --- a/executor/local/opts.go +++ b/executor/local/opts.go @@ -7,6 +7,7 @@ package local import ( "fmt" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime" "github.com/go-vela/sdk-go/vela" @@ -119,3 +120,14 @@ func WithVersion(version string) Opt { return nil } } + +// withStreamRequests sets the streamRequests channel in the executor client for Linux +// (primarily used for tests). +func withStreamRequests(s chan message.StreamRequest) Opt { + return func(c *client) error { + // set the streamRequests channel in the client + c.streamRequests = s + + return nil + } +} diff --git a/executor/local/service.go b/executor/local/service.go index bc5269c5..eba3ee96 100644 --- a/executor/local/service.go +++ b/executor/local/service.go @@ -11,6 +11,7 @@ import ( "os" "time" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/internal/service" "github.com/go-vela/types/constants" @@ -96,13 +97,12 @@ func (c *client) ExecService(ctx context.Context, ctn *pipeline.Container) error return err } - go func() { - // stream logs from container - err := c.StreamService(context.Background(), ctn) - if err != nil { - fmt.Fprintln(os.Stdout, "unable to stream logs for service:", err) - } - }() + // trigger StreamService goroutine with logging context + c.streamRequests <- message.StreamRequest{ + Key: "service", + Stream: c.StreamService, + Container: ctn, + } return nil } diff --git a/executor/local/service_test.go b/executor/local/service_test.go index cc619c4d..304c28ae 100644 --- a/executor/local/service_test.go +++ b/executor/local/service_test.go @@ -8,6 +8,7 @@ import ( "context" "testing" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime/docker" "github.com/go-vela/types/library" @@ -180,6 +181,9 @@ func TestLocal_ExecService(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + // setup tests tests := []struct { name string @@ -232,6 +236,7 @@ func TestLocal_ExecService(t *testing.T) { WithRepo(_repo), WithRuntime(_runtime), WithUser(_user), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) diff --git a/executor/local/stage_test.go b/executor/local/stage_test.go index c5f9559b..2aa5f887 100644 --- a/executor/local/stage_test.go +++ b/executor/local/stage_test.go @@ -15,6 +15,7 @@ import ( "github.com/go-vela/server/compiler/native" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime/docker" "github.com/go-vela/types/pipeline" @@ -260,6 +261,9 @@ func TestLocal_ExecStage(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + // setup tests tests := []struct { name string @@ -316,6 +320,7 @@ func TestLocal_ExecStage(t *testing.T) { WithRepo(_repo), WithRuntime(_runtime), WithUser(_user), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) diff --git a/executor/local/step.go b/executor/local/step.go index 032ccbbf..27c6bcac 100644 --- a/executor/local/step.go +++ b/executor/local/step.go @@ -14,6 +14,7 @@ import ( "github.com/go-vela/types/constants" "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/internal/step" ) @@ -103,14 +104,12 @@ func (c *client) ExecStep(ctx context.Context, ctn *pipeline.Container) error { return err } - go func() { - // stream logs from container - err := c.StreamStep(context.Background(), ctn) - if err != nil { - // TODO: Should this be changed or removed? - fmt.Println(err) - } - }() + // trigger StreamStep goroutine with logging context + c.streamRequests <- message.StreamRequest{ + Key: "step", + Stream: c.StreamStep, + Container: ctn, + } // do not wait for detached containers if ctn.Detach { diff --git a/executor/local/step_test.go b/executor/local/step_test.go index 5a549368..8d452a92 100644 --- a/executor/local/step_test.go +++ b/executor/local/step_test.go @@ -8,6 +8,7 @@ import ( "context" "testing" + "github.com/go-vela/worker/internal/message" "github.com/go-vela/worker/runtime/docker" "github.com/go-vela/types/library" @@ -187,6 +188,9 @@ func TestLocal_ExecStep(t *testing.T) { t.Errorf("unable to create runtime engine: %v", err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + // setup tests tests := []struct { name string @@ -262,6 +266,7 @@ func TestLocal_ExecStep(t *testing.T) { WithRepo(_repo), WithRuntime(_runtime), WithUser(_user), + withStreamRequests(streamRequests), ) if err != nil { t.Errorf("unable to create executor engine: %v", err) diff --git a/executor/setup_test.go b/executor/setup_test.go index 2c44193c..069993d1 100644 --- a/executor/setup_test.go +++ b/executor/setup_test.go @@ -6,10 +6,10 @@ package executor import ( "net/http/httptest" - "reflect" "testing" "github.com/gin-gonic/gin" + "github.com/google/go-cmp/cmp" "github.com/go-vela/server/mock/server" @@ -111,8 +111,10 @@ func TestExecutor_Setup_Linux(t *testing.T) { t.Errorf("Linux returned err: %v", err) } - if !reflect.DeepEqual(got, want) { - t.Errorf("Linux is %v, want %v", got, want) + // Comparing with reflect.DeepEqual(x, y interface) panics due to the + // unexported streamRequests channel. + if diff := cmp.Diff(want, got, cmp.Comparer(linux.Equal)); diff != "" { + t.Errorf("linux Engine mismatch (-want +got):\n%v", diff) } } @@ -164,8 +166,10 @@ func TestExecutor_Setup_Local(t *testing.T) { t.Errorf("Local returned err: %v", err) } - if !reflect.DeepEqual(got, want) { - t.Errorf("Local is %v, want %v", got, want) + // Comparing with reflect.DeepEqual(x, y interface) panics due to the + // unexported streamRequests channel. + if diff := cmp.Diff(want, got, cmp.Comparer(local.Equal)); diff != "" { + t.Errorf("local Engine mismatch (-want +got):\n%v", diff) } } diff --git a/internal/message/doc.go b/internal/message/doc.go new file mode 100644 index 00000000..10bad174 --- /dev/null +++ b/internal/message/doc.go @@ -0,0 +1,11 @@ +// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +// Package message provides message types used in the executor. +// These types have to be in a separate package to prevent circular imports. +// +// Usage: +// +// import "github.com/go-vela/worker/internal/message" +package message diff --git a/internal/message/stream.go b/internal/message/stream.go new file mode 100644 index 00000000..9534f38e --- /dev/null +++ b/internal/message/stream.go @@ -0,0 +1,44 @@ +// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package message + +import ( + "context" + + "github.com/go-vela/types/pipeline" +) + +// StreamFunc is either StreamService or StreamStep in executor.Engine. +type StreamFunc = func(context.Context, *pipeline.Container) error + +// StreamRequest is the message used to begin streaming for a container +// (requests goes from ExecService / ExecStep to StreamBuild in executor). +type StreamRequest struct { + // Key is either "service" or "step". + Key string + // Stream is either Engine.StreamService or Engine.StreamStep. + Stream StreamFunc + // Container is the container for the service or step to stream logs for. + Container *pipeline.Container +} + +// MockStreamRequestsWithCancel discards all requests until you call the cancel function. +func MockStreamRequestsWithCancel(ctx context.Context) (chan StreamRequest, context.CancelFunc) { + cancelCtx, done := context.WithCancel(ctx) + streamRequests := make(chan StreamRequest) + + // discard all stream requests + go func() { + for { + select { + case <-streamRequests: + case <-cancelCtx.Done(): + return + } + } + }() + + return streamRequests, done +}