From 43a2c36f669ed2e757def97c901e1f31ee9b81c6 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Mon, 12 Dec 2022 22:35:28 -0600 Subject: [PATCH 1/4] fix(linux executor): make sure StreamBuild does not hang This makes ExecBuild close the streamRequests channel when it finishes. This allows StreamBuild to stop waiting for new StreamRequest messages because it knows that no more can be sent. --- executor/linux/build.go | 25 +++++-- executor/linux/build_test.go | 130 +++++++++++++++++++++++++++++------ executor/linux/linux.go | 1 + 3 files changed, 128 insertions(+), 28 deletions(-) diff --git a/executor/linux/build.go b/executor/linux/build.go index f57656a9..3e7e8063 100644 --- a/executor/linux/build.go +++ b/executor/linux/build.go @@ -458,10 +458,17 @@ func (c *client) AssembleBuild(ctx context.Context) error { // ExecBuild runs a pipeline for a build. func (c *client) ExecBuild(ctx context.Context) error { - // defer an upload of the build - // - // https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload - defer func() { build.Upload(c.build, c.Vela, c.err, c.Logger, c.repo) }() + defer func() { + // Exec* calls are responsible for sending StreamRequest messages. + // close the channel at the end of ExecBuild to signal that + // nothing else will send more StreamRequest messages. + close(c.streamRequests) + + // defer an upload of the build + // + // https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload + build.Upload(c.build, c.Vela, c.err, c.Logger, c.repo) + }() // execute the services for the pipeline for _, _service := range c.pipeline.Services { @@ -599,7 +606,13 @@ func (c *client) StreamBuild(ctx context.Context) error { for { select { - case req := <-c.streamRequests: + case req, ok := <-c.streamRequests: + if !ok { + // ExecBuild is done requesting streams + c.Logger.Debug("not accepting any more stream requests as channel is closed") + return nil + } + streams.Go(func() error { // update engine logger with step metadata // @@ -616,7 +629,7 @@ func (c *client) StreamBuild(ctx context.Context) error { return nil }) case <-delayedCtx.Done(): - c.Logger.Debug("streaming context canceled") + c.Logger.Debug("not accepting any more stream requests as streaming context is canceled") // build done or canceled return nil } diff --git a/executor/linux/build_test.go b/executor/linux/build_test.go index 97a99136..3b2ec599 100644 --- a/executor/linux/build_test.go +++ b/executor/linux/build_test.go @@ -918,9 +918,6 @@ func TestLinux_ExecBuild(t *testing.T) { t.Errorf("unable to create docker runtime engine: %v", err) } - streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) - defer done() - tests := []struct { name string failure bool @@ -972,6 +969,9 @@ func TestLinux_ExecBuild(t *testing.T) { t.Errorf("unable to compile %s pipeline %s: %v", test.name, test.pipeline, err) } + streamRequests, done := message.MockStreamRequestsWithCancel(context.Background()) + defer done() + _engine, err := New( WithBuild(_build), WithPipeline(_pipeline), @@ -1073,13 +1073,16 @@ func TestLinux_StreamBuild(t *testing.T) { } tests := []struct { - name string - failure bool - pipeline string - messageKey string - ctn *pipeline.Container - streamFunc func(*client) message.StreamFunc - planFunc func(*client) planFuncType + name string + failure bool + earlyExecExit bool + earlyBuildDone bool + pipeline string + msgCount int + messageKey string + ctn *pipeline.Container + streamFunc func(*client) message.StreamFunc + planFunc func(*client) planFuncType }{ { name: "docker-basic services pipeline", @@ -1214,6 +1217,72 @@ func TestLinux_StreamBuild(t *testing.T) { Pull: "not_present", }, }, + { + name: "docker-early exit from ExecBuild", + failure: false, + earlyExecExit: true, + pipeline: "testdata/build/steps/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + return c.PlanStep + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, + { + name: "docker-build complete before ExecBuild called", + failure: false, + earlyBuildDone: true, + pipeline: "testdata/build/steps/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + return c.PlanStep + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, + { + name: "docker-early exit from ExecBuild and build complete signaled", + failure: false, + earlyExecExit: true, + pipeline: "testdata/build/steps/basic.yml", + messageKey: "step", + streamFunc: func(c *client) message.StreamFunc { + return c.StreamStep + }, + planFunc: func(c *client) planFuncType { + return c.PlanStep + }, + ctn: &pipeline.Container{ + ID: "step_github_octocat_1_test", + Directory: "/vela/src/github.com/github/octocat", + Environment: map[string]string{"FOO": "bar"}, + Image: "alpine:latest", + Name: "test", + Number: 1, + Pull: "not_present", + }, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -1255,21 +1324,38 @@ func TestLinux_StreamBuild(t *testing.T) { // simulate ExecBuild() which runs concurrently with StreamBuild() go func() { - // ExecBuild calls PlanService()/PlanStep() before ExecService()/ExecStep() - // (ExecStage() calls PlanStep() before ExecStep()). - _engine.err = test.planFunc(_engine)(buildCtx, test.ctn) - - // ExecService()/ExecStep()/secret.exec() send this message - streamRequests <- message.StreamRequest{ - Key: test.messageKey, - Stream: test.streamFunc(_engine), - Container: test.ctn, + if test.earlyBuildDone { + // imitate build getting canceled or otherwise finishing before ExecBuild gets called. + done() + } + if test.earlyExecExit { + // imitate a failure after ExecBuild starts and before it sends a StreamRequest. + close(streamRequests) + } + if test.earlyBuildDone || test.earlyExecExit { + return } - // simulate exec build duration - time.Sleep(100 * time.Microsecond) + // simulate two messages of the same type + for i := 0; i < 2; i++ { + // ExecBuild calls PlanService()/PlanStep() before ExecService()/ExecStep() + // (ExecStage() calls PlanStep() before ExecStep()). + _engine.err = test.planFunc(_engine)(buildCtx, test.ctn) + + // ExecService()/ExecStep()/secret.exec() send this message + streamRequests <- message.StreamRequest{ + Key: test.messageKey, + Stream: test.streamFunc(_engine), + // in a real pipeline, the second message would be for a different container + Container: test.ctn, + } + + // simulate exec build duration + time.Sleep(100 * time.Microsecond) + } - // signal the end of the build so StreamBuild can terminate + // signal the end of ExecBuild so StreamBuild can finish up + close(streamRequests) done() }() diff --git a/executor/linux/linux.go b/executor/linux/linux.go index 4a78d2fb..c3df35c8 100644 --- a/executor/linux/linux.go +++ b/executor/linux/linux.go @@ -107,6 +107,7 @@ func New(opts ...Opt) (*client, error) { c.Logger = logrus.NewEntry(logger) // instantiate streamRequests channel (which may be overridden using withStreamRequests()). + // messages get sent during ExecBuild, then ExecBuild closes this on exit. c.streamRequests = make(chan message.StreamRequest) // apply all provided configuration options From 64070476c70f5d5be5ff67bba6fc97b8becb39fa Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Mon, 12 Dec 2022 22:36:23 -0600 Subject: [PATCH 2/4] fix(linux executor): wait for context before reporting all stream functions done --- executor/linux/build.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/executor/linux/build.go b/executor/linux/build.go index 3e7e8063..8523827e 100644 --- a/executor/linux/build.go +++ b/executor/linux/build.go @@ -593,6 +593,10 @@ func (c *client) StreamBuild(ctx context.Context) error { } cancelStreaming() + // wait for context to be done before reporting that everything has returned. + <-delayedCtx.Done() + // there might be one more log message from WithDelayedCancelPropagation + // but there's not a good way to wait for that goroutine to finish. c.Logger.Info("all stream functions have returned") }() From e4e722f8d2f19e227943955100672798a79d7d5b Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Mon, 12 Dec 2022 22:38:51 -0600 Subject: [PATCH 3/4] fix(operator): move DestroyBuild defer so it gets called last The order of defer calls in operator.exec() was sub-optimal. Before, we were not canceling the buildCtx and timeoutCtx until after calling DestroyBuild. Now, goroutines get signaled to finish up before DestroyBuild gets called. --- cmd/vela-worker/exec.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/cmd/vela-worker/exec.go b/cmd/vela-worker/exec.go index a8a2b80d..01774c7f 100644 --- a/cmd/vela-worker/exec.go +++ b/cmd/vela-worker/exec.go @@ -91,6 +91,21 @@ func (w *Worker) exec(index int) error { // add the executor to the worker w.Executors[index] = _executor + // this gets deferred first so that DestroyBuild runs AFTER the + // new contexts (buildCtx and timeoutCtx) have been canceled + defer func() { + logger.Info("destroying build") + + // destroy the build with the executor (pass a background + // context to guarantee all build resources are destroyed). + err = _executor.DestroyBuild(context.Background()) + if err != nil { + logger.Errorf("unable to destroy build: %v", err) + } + + logger.Info("completed build") + }() + // capture the configured build timeout t := w.Config.Build.Timeout // check if the repository has a custom timeout @@ -109,19 +124,6 @@ func (w *Worker) exec(index int) error { timeoutCtx, timeout := context.WithTimeout(buildCtx, t) defer timeout() - defer func() { - logger.Info("destroying build") - - // destroy the build with the executor (pass a background - // context to guarantee all build resources are destroyed). - err = _executor.DestroyBuild(context.Background()) - if err != nil { - logger.Errorf("unable to destroy build: %v", err) - } - - logger.Info("completed build") - }() - logger.Info("creating build") // create the build with the executor err = _executor.CreateBuild(timeoutCtx) From 9ccb1bf0f6d215f930e1089ec6699eb14af63095 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 18 Nov 2022 00:24:15 -0600 Subject: [PATCH 4/4] Ensure build is not destroyed until streaming returns --- cmd/vela-worker/exec.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cmd/vela-worker/exec.go b/cmd/vela-worker/exec.go index 01774c7f..06a60ae8 100644 --- a/cmd/vela-worker/exec.go +++ b/cmd/vela-worker/exec.go @@ -6,6 +6,7 @@ package main import ( "context" + "sync" "time" "github.com/go-vela/worker/executor" @@ -91,9 +92,15 @@ func (w *Worker) exec(index int) error { // add the executor to the worker w.Executors[index] = _executor + // This WaitGroup delays calling DestroyBuild until the StreamBuild goroutine finishes. + var wg sync.WaitGroup + // this gets deferred first so that DestroyBuild runs AFTER the // new contexts (buildCtx and timeoutCtx) have been canceled defer func() { + // if exec() exits before starting StreamBuild, this returns immediately. + wg.Wait() + logger.Info("destroying build") // destroy the build with the executor (pass a background @@ -140,8 +147,12 @@ func (w *Worker) exec(index int) error { return nil } + // add StreamBuild goroutine to WaitGroup + wg.Add(1) + // log/event streaming uses buildCtx so that it is not subject to the timeout. go func() { + defer wg.Done() logger.Info("streaming build logs") // execute the build with the executor err = _executor.StreamBuild(buildCtx)