Skip to content

Commit

Permalink
Merge branch 'master' into k8s-pause-running
Browse files Browse the repository at this point in the history
  • Loading branch information
wass3r authored Apr 28, 2022
2 parents 17b5ad0 + c0823e8 commit c55798b
Show file tree
Hide file tree
Showing 31 changed files with 1,068 additions and 89 deletions.
15 changes: 13 additions & 2 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ func (w *Worker) exec(index int) error {
}

// create a background context
ctx := context.Background()
buildCtx, done := context.WithCancel(context.Background())
defer done()

// add to the background context with a timeout
// built in for ensuring a build doesn't run forever
ctx, timeout := context.WithTimeout(ctx, t)
ctx, timeout := context.WithTimeout(buildCtx, t)
defer timeout()

defer func() {
Expand Down Expand Up @@ -128,6 +129,16 @@ func (w *Worker) exec(index int) error {
return nil
}

// log streaming uses buildCtx so that it is not subject to the timeout.
go func() {
logger.Info("streaming build logs")
// execute the build with the executor
err = _executor.StreamBuild(buildCtx)
if err != nil {
logger.Errorf("unable to stream build logs: %v", err)
}
}()

logger.Info("assembling build")
// assemble the build with the executor
err = _executor.AssembleBuild(ctx)
Expand Down
3 changes: 3 additions & 0 deletions executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Engine interface {
// ExecBuild defines a function that
// runs a pipeline for a build.
ExecBuild(context.Context) error
// StreamBuild defines a function that receives a StreamRequest
// and then runs StreamService or StreamStep in a goroutine.
StreamBuild(context.Context) error
// DestroyBuild defines a function that
// cleans up the build after execution.
DestroyBuild(context.Context) error
Expand Down
26 changes: 18 additions & 8 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/gin-gonic/gin"
"github.com/google/go-cmp/cmp"

"github.com/go-vela/server/mock/server"

Expand Down Expand Up @@ -77,6 +78,7 @@ func TestExecutor_New(t *testing.T) {
failure bool
setup *Setup
want Engine
equal interface{}
}{
{
name: "driver-darwin",
Expand All @@ -91,7 +93,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
{
name: "driver-linux",
Expand All @@ -108,7 +111,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: _linux,
want: _linux,
equal: linux.Equal,
},
{
name: "driver-local",
Expand All @@ -123,7 +127,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: _local,
want: _local,
equal: local.Equal,
},
{
name: "driver-windows",
Expand All @@ -138,7 +143,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
{
name: "driver-invalid",
Expand All @@ -153,7 +159,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
{
name: "driver-empty",
Expand All @@ -168,7 +175,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
}

Expand All @@ -193,8 +201,10 @@ func TestExecutor_New(t *testing.T) {
t.Errorf("New returned err: %v", err)
}

if !reflect.DeepEqual(got, test.want) {
t.Errorf("New is %v, want %v", got, test.want)
// Comparing with reflect.DeepEqual(x, y interface) panics due to the
// unexported streamRequests channel.
if diff := cmp.Diff(test.want, got, cmp.Comparer(test.equal)); diff != "" {
t.Errorf("engine mismatch (-want +got):\n%v", diff)
}
})
}
Expand Down
44 changes: 44 additions & 0 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,50 @@ func (c *client) ExecBuild(ctx context.Context) error {
return c.err
}

// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context) error {
// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
streams, streamCtx := errgroup.WithContext(ctx)

defer func() {
c.Logger.Trace("waiting for stream functions to return")

err := streams.Wait()
if err != nil {
c.Logger.Errorf("error in a stream request, %v", err)
}

c.Logger.Info("all stream functions have returned")
}()

for {
select {
case req := <-c.streamRequests:
streams.Go(func() error {
// update engine logger with step metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
logger := c.Logger.WithField(req.Key, req.Container.Name)

logger.Debugf("streaming %s container %s", req.Key, req.Container.ID)

err := req.Stream(streamCtx, req.Container)
if err != nil {
logger.Error(err)
}

return nil
})
case <-ctx.Done():
// build done or canceled
return nil
}
}
}

// DestroyBuild cleans up the build after execution.
func (c *client) DestroyBuild(ctx context.Context) error {
var err error
Expand Down
Loading

0 comments on commit c55798b

Please sign in to comment.