Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve streaming coordination so it does not hang #410

Merged
merged 6 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main

import (
"context"
"sync"
"time"

"github.com/go-vela/worker/executor"
Expand Down Expand Up @@ -91,6 +92,27 @@ func (w *Worker) exec(index int) error {
// add the executor to the worker
w.Executors[index] = _executor

// This WaitGroup delays calling DestroyBuild until the StreamBuild goroutine finishes.
var wg sync.WaitGroup

// this gets deferred first so that DestroyBuild runs AFTER the
// new contexts (buildCtx and timeoutCtx) have been canceled
defer func() {
// if exec() exits before starting StreamBuild, this returns immediately.
wg.Wait()

logger.Info("destroying build")

// destroy the build with the executor (pass a background
// context to guarantee all build resources are destroyed).
err = _executor.DestroyBuild(context.Background())
if err != nil {
logger.Errorf("unable to destroy build: %v", err)
}

logger.Info("completed build")
}()

// capture the configured build timeout
t := w.Config.Build.Timeout
// check if the repository has a custom timeout
Expand All @@ -109,19 +131,6 @@ func (w *Worker) exec(index int) error {
timeoutCtx, timeout := context.WithTimeout(buildCtx, t)
defer timeout()

defer func() {
logger.Info("destroying build")

// destroy the build with the executor (pass a background
// context to guarantee all build resources are destroyed).
err = _executor.DestroyBuild(context.Background())
if err != nil {
logger.Errorf("unable to destroy build: %v", err)
}

logger.Info("completed build")
}()

logger.Info("creating build")
// create the build with the executor
err = _executor.CreateBuild(timeoutCtx)
Expand All @@ -138,8 +147,12 @@ func (w *Worker) exec(index int) error {
return nil
}

// add StreamBuild goroutine to WaitGroup
wg.Add(1)

// log/event streaming uses buildCtx so that it is not subject to the timeout.
go func() {
defer wg.Done()
logger.Info("streaming build logs")
// execute the build with the executor
err = _executor.StreamBuild(buildCtx)
Expand Down
29 changes: 23 additions & 6 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,17 @@ func (c *client) AssembleBuild(ctx context.Context) error {

// ExecBuild runs a pipeline for a build.
func (c *client) ExecBuild(ctx context.Context) error {
// defer an upload of the build
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload
defer func() { build.Upload(c.build, c.Vela, c.err, c.Logger, c.repo) }()
defer func() {
// Exec* calls are responsible for sending StreamRequest messages.
// close the channel at the end of ExecBuild to signal that
// nothing else will send more StreamRequest messages.
close(c.streamRequests)

// defer an upload of the build
//
// https://pkg.go.dev/github.com/go-vela/worker/internal/build#Upload
build.Upload(c.build, c.Vela, c.err, c.Logger, c.repo)
}()

// execute the services for the pipeline
for _, _service := range c.pipeline.Services {
Expand Down Expand Up @@ -599,6 +606,10 @@ func (c *client) StreamBuild(ctx context.Context) error {
}

cancelStreaming()
// wait for context to be done before reporting that everything has returned.
<-delayedCtx.Done()
// there might be one more log message from WithDelayedCancelPropagation
// but there's not a good way to wait for that goroutine to finish.

c.Logger.Info("all stream functions have returned")
}()
Expand All @@ -612,7 +623,13 @@ func (c *client) StreamBuild(ctx context.Context) error {

for {
select {
case req := <-c.streamRequests:
case req, ok := <-c.streamRequests:
if !ok {
// ExecBuild is done requesting streams
c.Logger.Debug("not accepting any more stream requests as channel is closed")
return nil
}

streams.Go(func() error {
// update engine logger with step metadata
//
Expand All @@ -629,7 +646,7 @@ func (c *client) StreamBuild(ctx context.Context) error {
return nil
})
case <-delayedCtx.Done():
c.Logger.Debug("streaming context canceled")
c.Logger.Debug("not accepting any more stream requests as streaming context is canceled")
// build done or canceled
return nil
}
Expand Down
130 changes: 108 additions & 22 deletions executor/linux/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,9 +1146,6 @@ func TestLinux_ExecBuild(t *testing.T) {
t.Errorf("unable to create docker runtime engine: %v", err)
}

streamRequests, done := message.MockStreamRequestsWithCancel(context.Background())
defer done()

tests := []struct {
name string
failure bool
Expand Down Expand Up @@ -1200,6 +1197,9 @@ func TestLinux_ExecBuild(t *testing.T) {
t.Errorf("unable to compile %s pipeline %s: %v", test.name, test.pipeline, err)
}

streamRequests, done := message.MockStreamRequestsWithCancel(context.Background())
defer done()

_engine, err := New(
WithBuild(_build),
WithPipeline(_pipeline),
Expand Down Expand Up @@ -1301,13 +1301,16 @@ func TestLinux_StreamBuild(t *testing.T) {
}

tests := []struct {
name string
failure bool
pipeline string
messageKey string
ctn *pipeline.Container
streamFunc func(*client) message.StreamFunc
planFunc func(*client) planFuncType
name string
failure bool
earlyExecExit bool
earlyBuildDone bool
pipeline string
msgCount int
messageKey string
ctn *pipeline.Container
streamFunc func(*client) message.StreamFunc
planFunc func(*client) planFuncType
}{
{
name: "docker-basic services pipeline",
Expand Down Expand Up @@ -1442,6 +1445,72 @@ func TestLinux_StreamBuild(t *testing.T) {
Pull: "not_present",
},
},
{
name: "docker-early exit from ExecBuild",
failure: false,
earlyExecExit: true,
pipeline: "testdata/build/steps/basic.yml",
messageKey: "step",
streamFunc: func(c *client) message.StreamFunc {
return c.StreamStep
},
planFunc: func(c *client) planFuncType {
return c.PlanStep
},
ctn: &pipeline.Container{
ID: "step_github_octocat_1_test",
Directory: "/vela/src/github.com/github/octocat",
Environment: map[string]string{"FOO": "bar"},
Image: "alpine:latest",
Name: "test",
Number: 1,
Pull: "not_present",
},
},
{
name: "docker-build complete before ExecBuild called",
failure: false,
earlyBuildDone: true,
pipeline: "testdata/build/steps/basic.yml",
messageKey: "step",
streamFunc: func(c *client) message.StreamFunc {
return c.StreamStep
},
planFunc: func(c *client) planFuncType {
return c.PlanStep
},
ctn: &pipeline.Container{
ID: "step_github_octocat_1_test",
Directory: "/vela/src/github.com/github/octocat",
Environment: map[string]string{"FOO": "bar"},
Image: "alpine:latest",
Name: "test",
Number: 1,
Pull: "not_present",
},
},
{
name: "docker-early exit from ExecBuild and build complete signaled",
failure: false,
earlyExecExit: true,
pipeline: "testdata/build/steps/basic.yml",
messageKey: "step",
streamFunc: func(c *client) message.StreamFunc {
return c.StreamStep
},
planFunc: func(c *client) planFuncType {
return c.PlanStep
},
ctn: &pipeline.Container{
ID: "step_github_octocat_1_test",
Directory: "/vela/src/github.com/github/octocat",
Environment: map[string]string{"FOO": "bar"},
Image: "alpine:latest",
Name: "test",
Number: 1,
Pull: "not_present",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -1483,21 +1552,38 @@ func TestLinux_StreamBuild(t *testing.T) {

// simulate ExecBuild() which runs concurrently with StreamBuild()
go func() {
// ExecBuild calls PlanService()/PlanStep() before ExecService()/ExecStep()
// (ExecStage() calls PlanStep() before ExecStep()).
_engine.err = test.planFunc(_engine)(buildCtx, test.ctn)

// ExecService()/ExecStep()/secret.exec() send this message
streamRequests <- message.StreamRequest{
Key: test.messageKey,
Stream: test.streamFunc(_engine),
Container: test.ctn,
if test.earlyBuildDone {
// imitate build getting canceled or otherwise finishing before ExecBuild gets called.
done()
}
if test.earlyExecExit {
// imitate a failure after ExecBuild starts and before it sends a StreamRequest.
close(streamRequests)
}
if test.earlyBuildDone || test.earlyExecExit {
return
}

// simulate exec build duration
time.Sleep(100 * time.Microsecond)
// simulate two messages of the same type
for i := 0; i < 2; i++ {
// ExecBuild calls PlanService()/PlanStep() before ExecService()/ExecStep()
// (ExecStage() calls PlanStep() before ExecStep()).
_engine.err = test.planFunc(_engine)(buildCtx, test.ctn)

// ExecService()/ExecStep()/secret.exec() send this message
streamRequests <- message.StreamRequest{
Key: test.messageKey,
Stream: test.streamFunc(_engine),
// in a real pipeline, the second message would be for a different container
Container: test.ctn,
}

// simulate exec build duration
time.Sleep(100 * time.Microsecond)
}

// signal the end of the build so StreamBuild can terminate
// signal the end of ExecBuild so StreamBuild can finish up
close(streamRequests)
done()
}()

Expand Down
1 change: 1 addition & 0 deletions executor/linux/linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func New(opts ...Opt) (*client, error) {
c.Logger = logrus.NewEntry(logger)

// instantiate streamRequests channel (which may be overridden using withStreamRequests()).
// messages get sent during ExecBuild, then ExecBuild closes this on exit.
c.streamRequests = make(chan message.StreamRequest)

// apply all provided configuration options
Expand Down