Skip to content

Commit

Permalink
fix(worker): init env var for worker hooks (#6895)
Browse files Browse the repository at this point in the history
* fix(worker): init env var for worker hooks

Signed-off-by: Yvonnick Esnault <[email protected]>
  • Loading branch information
yesnault authored Mar 28, 2024
1 parent 35d9161 commit 655542e
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 61 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
BSD 3-Clause License

Copyright (c) 2013-2022, OVH SAS
Copyright (c) 2013-2024, OVH SAS
All rights reserved.

Redistribution and use in source and binary forms, with or without
Expand Down
24 changes: 17 additions & 7 deletions engine/worker/internal/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (w *CurrentWorker) runAction(ctx context.Context, a sdk.Action, jobID int64
}

//Run children actions
r, nDisabled := w.runSteps(ctx, a.Actions, a, jobID, secrets, actionName)
r, nDisabled := w.runSteps(ctx, a.Actions, jobID, secrets, actionName)
//If all steps are disabled, set action status to disabled
if nDisabled >= len(a.Actions) {
r.Status = sdk.StatusDisabled
Expand All @@ -287,7 +287,7 @@ func (w *CurrentWorker) runAction(ctx context.Context, a sdk.Action, jobID int64
return r
}

func (w *CurrentWorker) runSteps(ctx context.Context, steps []sdk.Action, a sdk.Action, jobID int64, secrets []sdk.Variable, stepName string) (sdk.Result, int) {
func (w *CurrentWorker) runSteps(ctx context.Context, steps []sdk.Action, jobID int64, secrets []sdk.Variable, stepName string) (sdk.Result, int) {
log.Info(ctx, "runSteps> start action steps %s %d len(steps):%d context=%p", stepName, jobID, len(steps), ctx)
defer func() {
log.Info(ctx, "runSteps> end action steps %s %d len(steps):%d context=%p (%s)", stepName, jobID, len(steps), ctx, ctx.Err())
Expand Down Expand Up @@ -461,7 +461,7 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (res sdk.
}

log.Info(ctx, "Executing hooks setup from directory: %s", hdFile.Name())
if err := w.executeHooksSetup(ctx, w.basedir, hdFile.Name()); err != nil {
if err := w.executeHooksSetup(ctx, w.basedir); err != nil {
return sdk.Result{Status: sdk.StatusFail, Reason: fmt.Sprintf("Error: unable to setup hooks: %v", err)}
}

Expand All @@ -472,7 +472,7 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (res sdk.
}

// Teardown worker hooks
if err := w.executeHooksTeardown(ctx, w.basedir, hdFile.Name()); err != nil {
if err := w.executeHooksTeardown(ctx, w.basedir); err != nil {
log.Error(ctx, "error while executing teardown hook scripts: %v", err)
}

Expand Down Expand Up @@ -586,7 +586,7 @@ func (w *CurrentWorker) setupHooks(ctx context.Context, jobInfo sdk.WorkflowNode
return nil
}

func (w *CurrentWorker) executeHooksSetup(ctx context.Context, fs afero.Fs, workingDir string) error {
func (w *CurrentWorker) executeHooksSetup(ctx context.Context, fs afero.Fs) error {
if strings.EqualFold(runtime.GOOS, "windows") {
log.Warn(ctx, "hooks are not supported on windows")
return nil
Expand All @@ -599,7 +599,7 @@ func (w *CurrentWorker) executeHooksSetup(ctx context.Context, fs afero.Fs, work
return sdk.WithStack(fmt.Errorf("invalid given basedir"))
}

workerEnv := w.Environ()
workerEnv := w.getEnvironmentForWorkerHook()

for _, h := range w.hooks {
filepath, err := basedir.RealPath(h.SetupPath)
Expand Down Expand Up @@ -643,7 +643,17 @@ func (w *CurrentWorker) executeHooksSetup(ctx context.Context, fs afero.Fs, work
return nil
}

func (w *CurrentWorker) executeHooksTeardown(ctx context.Context, fs afero.Fs, workingDir string) error {
func (w *CurrentWorker) getEnvironmentForWorkerHook() []string {
var workerEnv []string
for _, v := range w.Environ() {
if strings.HasPrefix(v, "CDS_INTEGRATION_") || strings.HasPrefix(v, "BASEDIR=") {
workerEnv = append(workerEnv, v)
}
}
return workerEnv
}

func (w *CurrentWorker) executeHooksTeardown(ctx context.Context, fs afero.Fs) error {
basedir, ok := fs.(*afero.BasePathFs)
if !ok {
return sdk.WithStack(fmt.Errorf("invalid given basedir"))
Expand Down
102 changes: 50 additions & 52 deletions engine/worker/internal/runV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (w *CurrentWorker) V2ProcessJob() (res sdk.V2WorkflowRunJobResult) {
w.currentJobV2.runJobContext.CDS.Workspace = wdAbs

log.Info(ctx, "Executing hooks setup from directory: %s", hdFile.Name())
if err := w.executeHooksSetupV2(ctx, w.basedir, hdFile.Name()); err != nil {
if err := w.executeHooksSetupV2(ctx, w.basedir); err != nil {
log.ErrorWithStackTrace(ctx, err)
return w.failJob(ctx, fmt.Sprintf("Error: unable to setup hooks: %v", err))
}
Expand Down Expand Up @@ -470,9 +470,6 @@ func (w *CurrentWorker) runPlugin(ctx context.Context, pluginName string, opts m
}

pluginResult := pluginClient.Run(ctx, opts)
if err != nil {
return w.failJob(ctx, fmt.Sprintf("error running plugin %s: %v", pluginName, err))
}

if pluginResult.Status == sdk.StatusFail {
return w.failJob(ctx, pluginResult.Details)
Expand Down Expand Up @@ -610,14 +607,14 @@ func computeIntegrationConfigToEnvVar(integ sdk.ProjectIntegration, prefix strin
envVars := make(map[string]string)
for k, v := range integ.Config {
suffix := strings.Replace(k, "-", "_", -1)
suffix = strings.Replace(k, ".", "_", -1)
suffix = strings.Replace(suffix, ".", "_", -1)
key := fmt.Sprintf("CDS_INTEGRATION_%s_%s", prefix, suffix)
envVars[strings.ToUpper(key)] = sdk.OneLineValue(v.Value)
}
return envVars
}

func (w *CurrentWorker) executeHooksSetupV2(ctx context.Context, fs afero.Fs, workingDir string) error {
func (w *CurrentWorker) executeHooksSetupV2(ctx context.Context, fs afero.Fs) error {
if strings.EqualFold(runtime.GOOS, "windows") {
log.Warn(ctx, "hooks are not supported on windows")
return nil
Expand All @@ -628,18 +625,20 @@ func (w *CurrentWorker) executeHooksSetupV2(ctx context.Context, fs afero.Fs, wo

// Load integrations
integrationEnv := make([]string, 0)
for _, name := range w.currentJobV2.runJobContext.Integrations.All() {
integration, err := w.V2GetIntegrationByName(ctx, name)
if err != nil {
return nil
}
for k, v := range integration.Config {
varKey := fmt.Sprintf("cds.integration.%s.%s", sdk.GetIntegrationVariablePrefix(integration.Model), k)
varValue := sdk.OneLineValue(v.Value)
envName := strings.Replace(varKey, ".", "_", -1)
envName = strings.Replace(envName, "-", "_", -1)
envName = strings.ToUpper(envName)
integrationEnv = append(integrationEnv, fmt.Sprintf("%s=%s", envName, varValue))
if w.currentJobV2.runJobContext.Integrations != nil {
for _, name := range w.currentJobV2.runJobContext.Integrations.All() {
integration, err := w.V2GetIntegrationByName(ctx, name)
if err != nil {
return nil
}
for k, v := range integration.Config {
varKey := fmt.Sprintf("cds.integration.%s.%s", sdk.GetIntegrationVariablePrefix(integration.Model), k)
varValue := sdk.OneLineValue(v.Value)
envName := strings.Replace(varKey, ".", "_", -1)
envName = strings.Replace(envName, "-", "_", -1)
envName = strings.ToUpper(envName)
integrationEnv = append(integrationEnv, fmt.Sprintf("%s=%s", envName, varValue))
}
}
}

Expand All @@ -650,7 +649,7 @@ func (w *CurrentWorker) executeHooksSetupV2(ctx context.Context, fs afero.Fs, wo
return sdk.WithStack(fmt.Errorf("invalid given basedir"))
}

workerEnv := w.Environ()
workerEnv := w.getEnvironmentForWorkerHook()

for _, h := range w.hooks {
filepath, err := basedir.RealPath(h.SetupPath)
Expand Down Expand Up @@ -744,44 +743,43 @@ func (w *CurrentWorker) setupHooksV2(ctx context.Context, currentJob CurrentJobV
TeardownPath: path.Join(workingDir, "teardown", hookFilename),
})
}
}

for _, h := range w.hooks {

info := sdk.V2SendJobRunInfo{
Message: fmt.Sprintf("Setting up worker hook %q", h.Config.Label),
Level: sdk.WorkflowRunInfoLevelInfo,
Time: time.Now(),
}
for _, h := range w.hooks {
info := sdk.V2SendJobRunInfo{
Message: fmt.Sprintf("Setting up worker hook %q", h.Config.Label),
Level: sdk.WorkflowRunInfoLevelInfo,
Time: time.Now(),
}

if err := w.ClientV2().V2QueuePushJobInfo(ctx, w.currentJobV2.runJob.Region, w.currentJobV2.runJob.ID, info); err != nil {
log.Error(ctx, "runJobServiceReadiness> Unable to send spawn info: %v", err)
}
if err := w.ClientV2().V2QueuePushJobInfo(ctx, w.currentJobV2.runJob.Region, w.currentJobV2.runJob.ID, info); err != nil {
log.Error(ctx, "runJobServiceReadiness> Unable to send spawn info: %v", err)
}

log.Info(ctx, "setting up hook at %q", h.SetupPath)
log.Info(ctx, "setting up hook at %q", h.SetupPath)

hookFile, err := fs.Create(h.SetupPath)
if err != nil {
return errors.Errorf("unable to open hook file %q in %q: %v", h.SetupPath, w.basedir.Name(), err)
}
if _, err := hookFile.WriteString(h.Config.Setup); err != nil {
_ = hookFile.Close
return errors.Errorf("unable to setup hook %q: %v", h.SetupPath, err)
}
if err := hookFile.Close(); err != nil {
return errors.Errorf("unable to setup hook %q: %v", h.SetupPath, err)
}
hookFile, err := fs.Create(h.SetupPath)
if err != nil {
return errors.Errorf("unable to open hook file %q in %q: %v", h.SetupPath, w.basedir.Name(), err)
}
if _, err := hookFile.WriteString(h.Config.Setup); err != nil {
_ = hookFile.Close
return errors.Errorf("unable to setup hook %q: %v", h.SetupPath, err)
}
if err := hookFile.Close(); err != nil {
return errors.Errorf("unable to setup hook %q: %v", h.SetupPath, err)
}

hookFile, err = fs.Create(h.TeardownPath)
if err != nil {
return errors.Errorf("unable to open hook file %q: %v", h.TeardownPath, err)
}
if _, err := hookFile.WriteString(h.Config.Teardown); err != nil {
_ = hookFile.Close
return errors.Errorf("unable to setup hook %q: %v", h.TeardownPath, err)
}
if err := hookFile.Close(); err != nil {
return errors.Errorf("unable to setup hook %q: %v", h.TeardownPath, err)
}
hookFile, err = fs.Create(h.TeardownPath)
if err != nil {
return errors.Errorf("unable to open hook file %q: %v", h.TeardownPath, err)
}
if _, err := hookFile.WriteString(h.Config.Teardown); err != nil {
_ = hookFile.Close
return errors.Errorf("unable to setup hook %q: %v", h.TeardownPath, err)
}
if err := hookFile.Close(); err != nil {
return errors.Errorf("unable to setup hook %q: %v", h.TeardownPath, err)
}
}
return nil
Expand Down
93 changes: 93 additions & 0 deletions engine/worker/internal/runV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ package internal
import (
"context"
"encoding/base64"
"fmt"
"os"
"path"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/ovh/cds/engine/test"
"github.com/ovh/cds/engine/worker/internal/plugin/mock"
"github.com/ovh/cds/engine/worker/pkg/workerruntime"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient/mock_cdsclient"
"github.com/ovh/cds/sdk/jws"
cdslog "github.com/ovh/cds/sdk/log"
"github.com/ovh/cds/sdk/log/hook"
"github.com/spf13/afero"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -229,3 +236,89 @@ func TestCurrentWorker_runJobServicesReadinessWithServiceWithCommand(t *testing.
result := w.runJobServicesReadiness(context.TODO())
require.Equal(t, sdk.V2WorkflowRunJobStatusSuccess, result.Status)
}

func TestCurrentWorker_executeHooksSetupV2(t *testing.T) {
fs := afero.NewOsFs()
basedir := "test-" + test.GetTestName(t) + "-" + sdk.RandomString(10) + "-" + fmt.Sprintf("%d", time.Now().Unix())
t.Logf("Creating worker basedir at %s", basedir)
require.NoError(t, fs.MkdirAll(basedir, os.FileMode(0755)))

ctrl := gomock.NewController(t)
mockClient := mock_cdsclient.NewMockV2WorkerInterface(ctrl)

mockClient.EXPECT().V2QueuePushJobInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, regionName string, jobRunID string, msg sdk.V2SendJobRunInfo) error {
require.Equal(t, sdk.WorkflowRunInfoLevelInfo, msg.Level)
return nil
},
)
mockClient.EXPECT().ProjectIntegrationWorkerHookGet(gomock.Any(), gomock.Any()).DoAndReturn(
func(projectKey string, integrationName string) (*sdk.WorkerHookProjectIntegrationModel, error) {
return nil, sdk.ErrNotFound
},
)
mockClient.EXPECT().ProjectIntegrationGet(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(projectKey string, integrationName string, clearPassword bool) (sdk.ProjectIntegration, error) {
return sdk.ProjectIntegration{}, nil
},
)

// Setup test worker
wk := &CurrentWorker{
basedir: afero.NewBasePathFs(fs, basedir),
cfg: &workerruntime.WorkerConfig{CDNEndpoint: "https://cdn.local"},
clientV2: mockClient,
}

wk.currentJobV2.runJob = &sdk.V2WorkflowRunJob{
ID: sdk.UUID(),
Status: sdk.StatusBuilding,
JobID: "myjob",
Region: "build",
Job: sdk.V2Job{
Region: "build",
Steps: []sdk.ActionStep{
{
ID: "step-0",
Run: "exit 0",
ContinueOnError: true,
},
},
},
}
wk.SetContextForTestJobV2(t, context.TODO())
wk.currentJobV2.runJobContext = sdk.WorkflowRunJobsContext{}
wk.currentJobV2.runJobContext.Integrations = &sdk.JobIntegrationsContext{ArtifactManager: "foo"}
wk.currentJobV2.integrations = make(map[string]sdk.ProjectIntegration)

wk.hooks = []workerHook{{
Config: sdk.WorkerHookSetupTeardownScripts{
Priority: 0,
Label: "foo",
Setup: `#!/bin/bash
export NEW_VAR=testfoo
`,
Teardown: `#!/bin/bash
echo 'done'
`,
},
SetupPath: path.Join(basedir, "setup", "test-hook"),
TeardownPath: path.Join(basedir, "teardown", "test-hook"),
}}

err := wk.setupHooksV2(context.TODO(), wk.currentJobV2, wk.basedir, basedir)
require.NoError(t, err)

err = wk.executeHooksSetupV2(context.TODO(), wk.basedir)
require.NoError(t, err)

var found bool
for k, v := range wk.currentJobV2.envFromHooks {
if k == "NEW_VAR" && v == "testfoo" {
found = true
break
}
}
t.Logf("envFromHooks: %v", wk.currentJobV2.envFromHooks)
require.True(t, found)
}
1 change: 0 additions & 1 deletion tests/fixtures/ITSCWRKFLWRT1/push-artifact.pip.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ jobs:
- job: Release-to-staging
stage: Stage 2
steps:
- pushBuildInfo: '{{.cds.workflow}}'
- promote:
artifacts: .*.txt
srcMaturity: snapshot
Expand Down

0 comments on commit 655542e

Please sign in to comment.