Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: Make context handling for log streaming more explicit #317

Merged
merged 14 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ func (w *Worker) exec(index int) error {
}

// create a background context
ctx := context.Background()
buildCtx, done := context.WithCancel(context.Background())
defer done()

// add to the background context with a timeout
// built in for ensuring a build doesn't run forever
ctx, timeout := context.WithTimeout(ctx, t)
ctx, timeout := context.WithTimeout(buildCtx, t)
defer timeout()

defer func() {
Expand Down Expand Up @@ -128,6 +129,16 @@ func (w *Worker) exec(index int) error {
return nil
}

// log streaming uses buildCtx so that it is not subject to the timeout.
go func() {
logger.Info("streaming build logs")
// execute the build with the executor
err = _executor.StreamBuild(buildCtx)
if err != nil {
logger.Errorf("unable to stream build logs: %v", err)
}
}()

logger.Info("assembling build")
// assemble the build with the executor
err = _executor.AssembleBuild(ctx)
Expand Down
3 changes: 3 additions & 0 deletions executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Engine interface {
// ExecBuild defines a function that
// runs a pipeline for a build.
ExecBuild(context.Context) error
// StreamBuild defines a function that receives a StreamRequest
// and then runs StreamService or StreamStep in a goroutine.
StreamBuild(context.Context) error
// DestroyBuild defines a function that
// cleans up the build after execution.
DestroyBuild(context.Context) error
Expand Down
26 changes: 18 additions & 8 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/gin-gonic/gin"
"github.com/google/go-cmp/cmp"

"github.com/go-vela/server/mock/server"

Expand Down Expand Up @@ -77,6 +78,7 @@ func TestExecutor_New(t *testing.T) {
failure bool
setup *Setup
want Engine
equal interface{}
}{
{
name: "driver-darwin",
Expand All @@ -91,7 +93,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
{
name: "driver-linux",
Expand All @@ -108,7 +111,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: _linux,
want: _linux,
equal: linux.Equal,
},
{
name: "driver-local",
Expand All @@ -123,7 +127,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: _local,
want: _local,
equal: local.Equal,
},
{
name: "driver-windows",
Expand All @@ -138,7 +143,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
{
name: "driver-invalid",
Expand All @@ -153,7 +159,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
{
name: "driver-empty",
Expand All @@ -168,7 +175,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
}

Expand All @@ -193,8 +201,10 @@ func TestExecutor_New(t *testing.T) {
t.Errorf("New returned err: %v", err)
}

if !reflect.DeepEqual(got, test.want) {
t.Errorf("New is %v, want %v", got, test.want)
// Comparing with reflect.DeepEqual(x, y interface) panics due to the
// unexported streamRequests channel.
if diff := cmp.Diff(test.want, got, cmp.Comparer(test.equal)); diff != "" {
t.Errorf("engine mismatch (-want +got):\n%v", diff)
}
})
}
Expand Down
44 changes: 44 additions & 0 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,50 @@ func (c *client) ExecBuild(ctx context.Context) error {
return c.err
}

// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context) error {
// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
streams, streamCtx := errgroup.WithContext(ctx)

defer func() {
c.Logger.Trace("waiting for stream functions to return")

err := streams.Wait()
if err != nil {
c.Logger.Errorf("error in a stream request, %v", err)
}

c.Logger.Info("all stream functions have returned")
}()

for {
select {
case req := <-c.streamRequests:
streams.Go(func() error {
// update engine logger with step metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
logger := c.Logger.WithField(req.Key, req.Container.Name)

logger.Debugf("streaming %s container %s", req.Key, req.Container.ID)

err := req.Stream(streamCtx, req.Container)
if err != nil {
logger.Error(err)
}

return nil
})
case <-ctx.Done():
// build done or canceled
return nil
}
}
}

// DestroyBuild cleans up the build after execution.
func (c *client) DestroyBuild(ctx context.Context) error {
var err error
Expand Down
Loading