Skip to content

Commit

Permalink
fix: improve streaming coordination so it does not hang (#410)
Browse files Browse the repository at this point in the history
  • Loading branch information
cognifloyd authored Dec 16, 2022
1 parent 4eebb4f commit bb89524
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 41 deletions.
39 changes: 26 additions & 13 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main

import (
"context"
"sync"
"time"

"github.com/go-vela/worker/executor"
Expand Down Expand Up @@ -91,6 +92,27 @@ func (w *Worker) exec(index int) error {
// add the executor to the worker
w.Executors[index] = _executor

// This WaitGroup delays calling DestroyBuild until the StreamBuild goroutine finishes.
var wg sync.WaitGroup

// this gets deferred first so that DestroyBuild runs AFTER the
// new contexts (buildCtx and timeoutCtx) have been canceled
defer func() {
// if exec() exits before starting StreamBuild, this returns immediately.
wg.Wait()

logger.Info("destroying build")

// destroy the build with the executor (pass a background
// context to guarantee all build resources are destroyed).
err = _executor.DestroyBuild(context.Background())
if err != nil {
logger.Errorf("unable to destroy build: %v", err)
}

logger.Info("completed build")
}()

// capture the configured build timeout
t := w.Config.Build.Timeout
// check if the repository has a custom timeout
Expand All @@ -109,19 +131,6 @@ func (w *Worker) exec(index int) error {
timeoutCtx, timeout := context.WithTimeout(buildCtx, t)
defer timeout()

defer func() {
logger.Info("destroying build")

// destroy the build with the executor (pass a background
// context to guarantee all build resources are destroyed).
err = _executor.DestroyBuild(context.Background())
if err != nil {
logger.Errorf("unable to destroy build: %v", err)
}

logger.Info("completed build")
}()

logger.Info("creating build")
// create the build with the executor
err = _executor.CreateBuild(timeoutCtx)
Expand All @@ -138,8 +147,12 @@ func (w *Worker) exec(index int) error {
return nil
}

// add StreamBuild goroutine to WaitGroup
wg.Add(1)

// log/event streaming uses buildCtx so that it is not subject to the timeout.
go func() {
defer wg.Done()
logger.Info("streaming build logs")
// execute the build with the executor
err = _executor.StreamBuild(buildCtx)
Expand Down
29 changes: 23 additions & 6 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,17 @@ func (c *client) AssembleBuild(ctx context.Context) error {

// ExecBuild runs a pipeline for a build.
func (c *client) ExecBuild(ctx context.Context) error {
// defer an upload of the build
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload
defer func() { build.Upload(c.build, c.Vela, c.err, c.Logger, c.repo) }()
defer func() {
// Exec* calls are responsible for sending StreamRequest messages.
// close the channel at the end of ExecBuild to signal that
// nothing else will send more StreamRequest messages.
close(c.streamRequests)

// defer an upload of the build
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload
build.Upload(c.build, c.Vela, c.err, c.Logger, c.repo)
}()

// execute the services for the pipeline
for _, _service := range c.pipeline.Services {
Expand Down Expand Up @@ -599,6 +606,10 @@ func (c *client) StreamBuild(ctx context.Context) error {
}

cancelStreaming()
// wait for context to be done before reporting that everything has returned.
<-delayedCtx.Done()
// there might be one more log message from WithDelayedCancelPropagation
// but there's not a good way to wait for that goroutine to finish.

c.Logger.Info("all stream functions have returned")
}()
Expand All @@ -612,7 +623,13 @@ func (c *client) StreamBuild(ctx context.Context) error {

for {
select {
case req := <-c.streamRequests:
case req, ok := <-c.streamRequests:
if !ok {
// ExecBuild is done requesting streams
c.Logger.Debug("not accepting any more stream requests as channel is closed")
return nil
}

streams.Go(func() error {
// update engine logger with step metadata
//
Expand All @@ -629,7 +646,7 @@ func (c *client) StreamBuild(ctx context.Context) error {
return nil
})
case <-delayedCtx.Done():
c.Logger.Debug("streaming context canceled")
c.Logger.Debug("not accepting any more stream requests as streaming context is canceled")
// build done or canceled
return nil
}
Expand Down
130 changes: 108 additions & 22 deletions executor/linux/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,9 +1146,6 @@ func TestLinux_ExecBuild(t *testing.T) {
t.Errorf("unable to create docker runtime engine: %v", err)
}

streamRequests, done := message.MockStreamRequestsWithCancel(context.Background())
defer done()

tests := []struct {
name string
failure bool
Expand Down Expand Up @@ -1200,6 +1197,9 @@ func TestLinux_ExecBuild(t *testing.T) {
t.Errorf("unable to compile %s pipeline %s: %v", test.name, test.pipeline, err)
}

streamRequests, done := message.MockStreamRequestsWithCancel(context.Background())
defer done()

_engine, err := New(
WithBuild(_build),
WithPipeline(_pipeline),
Expand Down Expand Up @@ -1301,13 +1301,16 @@ func TestLinux_StreamBuild(t *testing.T) {
}

tests := []struct {
name string
failure bool
pipeline string
messageKey string
ctn *pipeline.Container
streamFunc func(*client) message.StreamFunc
planFunc func(*client) planFuncType
name string
failure bool
earlyExecExit bool
earlyBuildDone bool
pipeline string
msgCount int
messageKey string
ctn *pipeline.Container
streamFunc func(*client) message.StreamFunc
planFunc func(*client) planFuncType
}{
{
name: "docker-basic services pipeline",
Expand Down Expand Up @@ -1442,6 +1445,72 @@ func TestLinux_StreamBuild(t *testing.T) {
Pull: "not_present",
},
},
{
name: "docker-early exit from ExecBuild",
failure: false,
earlyExecExit: true,
pipeline: "testdata/build/steps/basic.yml",
messageKey: "step",
streamFunc: func(c *client) message.StreamFunc {
return c.StreamStep
},
planFunc: func(c *client) planFuncType {
return c.PlanStep
},
ctn: &pipeline.Container{
ID: "step_github_octocat_1_test",
Directory: "/vela/src/github.com/github/octocat",
Environment: map[string]string{"FOO": "bar"},
Image: "alpine:latest",
Name: "test",
Number: 1,
Pull: "not_present",
},
},
{
name: "docker-build complete before ExecBuild called",
failure: false,
earlyBuildDone: true,
pipeline: "testdata/build/steps/basic.yml",
messageKey: "step",
streamFunc: func(c *client) message.StreamFunc {
return c.StreamStep
},
planFunc: func(c *client) planFuncType {
return c.PlanStep
},
ctn: &pipeline.Container{
ID: "step_github_octocat_1_test",
Directory: "/vela/src/github.com/github/octocat",
Environment: map[string]string{"FOO": "bar"},
Image: "alpine:latest",
Name: "test",
Number: 1,
Pull: "not_present",
},
},
{
name: "docker-early exit from ExecBuild and build complete signaled",
failure: false,
earlyExecExit: true,
pipeline: "testdata/build/steps/basic.yml",
messageKey: "step",
streamFunc: func(c *client) message.StreamFunc {
return c.StreamStep
},
planFunc: func(c *client) planFuncType {
return c.PlanStep
},
ctn: &pipeline.Container{
ID: "step_github_octocat_1_test",
Directory: "/vela/src/github.com/github/octocat",
Environment: map[string]string{"FOO": "bar"},
Image: "alpine:latest",
Name: "test",
Number: 1,
Pull: "not_present",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -1483,21 +1552,38 @@ func TestLinux_StreamBuild(t *testing.T) {

// simulate ExecBuild() which runs concurrently with StreamBuild()
go func() {
// ExecBuild calls PlanService()/PlanStep() before ExecService()/ExecStep()
// (ExecStage() calls PlanStep() before ExecStep()).
_engine.err = test.planFunc(_engine)(buildCtx, test.ctn)

// ExecService()/ExecStep()/secret.exec() send this message
streamRequests <- message.StreamRequest{
Key: test.messageKey,
Stream: test.streamFunc(_engine),
Container: test.ctn,
if test.earlyBuildDone {
// imitate build getting canceled or otherwise finishing before ExecBuild gets called.
done()
}
if test.earlyExecExit {
// imitate a failure after ExecBuild starts and before it sends a StreamRequest.
close(streamRequests)
}
if test.earlyBuildDone || test.earlyExecExit {
return
}

// simulate exec build duration
time.Sleep(100 * time.Microsecond)
// simulate two messages of the same type
for i := 0; i < 2; i++ {
// ExecBuild calls PlanService()/PlanStep() before ExecService()/ExecStep()
// (ExecStage() calls PlanStep() before ExecStep()).
_engine.err = test.planFunc(_engine)(buildCtx, test.ctn)

// ExecService()/ExecStep()/secret.exec() send this message
streamRequests <- message.StreamRequest{
Key: test.messageKey,
Stream: test.streamFunc(_engine),
// in a real pipeline, the second message would be for a different container
Container: test.ctn,
}

// simulate exec build duration
time.Sleep(100 * time.Microsecond)
}

// signal the end of the build so StreamBuild can terminate
// signal the end of ExecBuild so StreamBuild can finish up
close(streamRequests)
done()
}()

Expand Down
1 change: 1 addition & 0 deletions executor/linux/linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func New(opts ...Opt) (*client, error) {
c.Logger = logrus.NewEntry(logger)

// instantiate streamRequests channel (which may be overridden using withStreamRequests()).
// messages get sent during ExecBuild, then ExecBuild closes this on exit.
c.streamRequests = make(chan message.StreamRequest)

// apply all provided configuration options
Expand Down

0 comments on commit bb89524

Please sign in to comment.