Skip to content

Commit

Permalink
Merge pull request #2852 from buildkite/ps-72-k8s-env-var-conveyor
Browse files Browse the repository at this point in the history
Don't try to early-set env vars
  • Loading branch information
DrJosh9000 authored Jun 27, 2024
2 parents 2ece143 + 2c938c0 commit f4b9942
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 68 deletions.
64 changes: 2 additions & 62 deletions clicommand/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ import (
"syscall"
"time"

"github.com/buildkite/agent/v3/env"
"github.com/buildkite/agent/v3/internal/job"
"github.com/buildkite/agent/v3/kubernetes"
"github.com/buildkite/agent/v3/process"
"github.com/buildkite/roko"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -387,64 +384,6 @@ var BootstrapCommand = cli.Command{
},
Action: func(c *cli.Context) error {
ctx := context.Background()

// Surprise! Before doing anything else, even loading the other
// flags/config, we connect the socket to get env vars. These are
// normally present in the environment if the bootstrap was forked,
// directly, but because in k8s land we use containers to separate the
// agent and its executors, such vars won't be available unless we do
// this.
var k8sAgentSocket *kubernetes.Client
if c.Bool(KubernetesExecFlag.Name) {
k8sAgentSocket = &kubernetes.Client{ID: c.Int("kubernetes-container-id")}

rtr := roko.NewRetrier(
roko.WithMaxAttempts(7),
roko.WithStrategy(roko.Exponential(2*time.Second, 0)),
)
regResp, err := roko.DoFunc(ctx, rtr, func(rtr *roko.Retrier) (*kubernetes.RegisterResponse, error) {
return k8sAgentSocket.Connect(ctx)
})
if err != nil {
return fmt.Errorf("error connecting to kubernetes runner: %w", err)
}

// Set our environment vars based on the registration response.
// But note that the k8s stack interprets the job definition itself,
// and sets a variety of env vars (e.g. BUILDKITE_COMMAND) that
// *could* be different to the ones the agent normally supplies.
// Examples:
// * The command container could be passed a specific
// BUILDKITE_COMMAND that is computed from the command+args
// podSpec attributes (in the kubernetes "plugin"), instead of the
// "command" attribute of the step.
// * BUILDKITE_PLUGINS is pre-processed by the k8s stack to remove
// the kubernetes "plugin". If we used the agent's default
// BUILDKITE_PLUGINS, we'd be trying to find a kubernetes plugin
// that doesn't exist.
// So we should skip setting any vars that are already set, and
// specifically any that could be deliberately *unset* by the
// k8s stack (BUILDKITE_PLUGINS could be unset if kubernetes is
// the only "plugin" in the step).
// (Maybe we could move some of the k8s stack processing in here?)
for n, v := range env.FromSlice(regResp.Env).Dump() {
// Skip these ones specifically.
// See agent-stack-k8s/internal/controller/scheduler/scheduler.go#(*jobWrapper).Build
switch n {
case "BUILDKITE_COMMAND", "BUILDKITE_ARTIFACT_PATHS", "BUILDKITE_PLUGINS":
continue
}
// Skip any that are already set.
if _, set := os.LookupEnv(n); set {
continue
}
// Set it!
if err := os.Setenv(n, v); err != nil {
return err
}
}
}

ctx, cfg, l, _, done := setupLoggerAndConfig[BootstrapConfig](ctx, c)
defer done()

Expand Down Expand Up @@ -523,7 +462,8 @@ var BootstrapCommand = cli.Command{
TracingServiceName: cfg.TracingServiceName,
JobAPI: !cfg.NoJobAPI,
DisabledWarnings: cfg.DisableWarningsFor,
K8sAgentSocket: k8sAgentSocket,
KubernetesExec: cfg.KubernetesExec,
KubernetesContainerID: cfg.KubernetesContainerID,
})

cctx, cancel := context.WithCancel(ctx)
Expand Down
6 changes: 3 additions & 3 deletions internal/job/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/buildkite/agent/v3/env"
"github.com/buildkite/agent/v3/kubernetes"
"github.com/buildkite/agent/v3/process"
)

Expand Down Expand Up @@ -169,8 +168,9 @@ type ExecutorConfig struct {
// Whether to start the JobAPI
JobAPI bool

// The connected Kubernetes socket, if needed
K8sAgentSocket *kubernetes.Client
// Whether to enable Kubernetes support, and which container we're running in
KubernetesExec bool
KubernetesContainerID int

// The warnings that have been disabled by the user
DisabledWarnings []string
Expand Down
58 changes: 55 additions & 3 deletions internal/job/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,14 @@ func (e *Executor) Run(ctx context.Context) (exitCode int) {
e.shell.SignalGracePeriod = e.ExecutorConfig.SignalGracePeriod
}

if e.K8sAgentSocket != nil {
if err := e.kubernetesSetup(ctx, e.K8sAgentSocket); err != nil {
if e.KubernetesExec {
socket := &kubernetes.Client{ID: e.KubernetesContainerID}
if err := e.kubernetesSetup(ctx, socket); err != nil {
e.shell.Errorf("Failed to start kubernetes socket client: %v", err)
return 1
}
defer func() {
_ = e.K8sAgentSocket.Exit(exitCode)
_ = socket.Exit(exitCode)
}()
}

Expand Down Expand Up @@ -1185,6 +1186,57 @@ func (e *Executor) setupRedactors() {
func (e *Executor) kubernetesSetup(ctx context.Context, k8sAgentSocket *kubernetes.Client) error {
e.shell.Commentf("Using Kubernetes support")

rtr := roko.NewRetrier(
roko.WithMaxAttempts(7),
roko.WithStrategy(roko.Exponential(2*time.Second, 0)),
)
regResp, err := roko.DoFunc(ctx, rtr, func(rtr *roko.Retrier) (*kubernetes.RegisterResponse, error) {
return k8sAgentSocket.Connect(ctx)
})
if err != nil {
return fmt.Errorf("error connecting to kubernetes runner: %w", err)
}

// Set our environment vars based on the registration response.
// But note that the k8s stack interprets the job definition itself,
// and sets a variety of env vars (e.g. BUILDKITE_COMMAND) that
// *could* be different to the ones the agent normally supplies.
// Examples:
// * The command container could be passed a specific
// BUILDKITE_COMMAND that is computed from the command+args
// podSpec attributes (in the kubernetes "plugin"), instead of the
// "command" attribute of the step.
// * BUILDKITE_PLUGINS is pre-processed by the k8s stack to remove
// the kubernetes "plugin". If we used the agent's default
// BUILDKITE_PLUGINS, we'd be trying to find a kubernetes plugin
// that doesn't exist.
// So we should skip setting any vars that are already set, and
// specifically any that could be deliberately *unset* by the
// k8s stack (BUILDKITE_PLUGINS could be unset if kubernetes is
// the only "plugin" in the step).
// (Maybe we could move some of the k8s stack processing in here?)
//
// To think about: how to obtain the env vars early enough to set
// them in ExecutorConfig (because of how urfave/cli works, it
// must happen before App.Run, which is before the program even knows
// which subcommand is running).
for n, v := range env.FromSlice(regResp.Env).Dump() {
// Skip these ones specifically.
// See agent-stack-k8s/internal/controller/scheduler/scheduler.go#(*jobWrapper).Build
switch n {
case "BUILDKITE_COMMAND", "BUILDKITE_ARTIFACT_PATHS", "BUILDKITE_PLUGINS":
continue
}
// Skip any that are already set.
if e.shell.Env.Exists(n) {
continue
}
// Set it!
e.shell.Env.Set(n, v)
if err := os.Setenv(n, v); err != nil {
return err
}
}
// Attach the log stream to the k8s client
writer := io.MultiWriter(os.Stdout, k8sAgentSocket)
e.shell.Writer = writer
Expand Down

0 comments on commit f4b9942

Please sign in to comment.