Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

feat: optional command timeout field in config file that defaults to syncTimeout #3228

Merged
merged 1 commit into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type repo interface {
func (d *Daemon) getManifestStore(r repo) (manifests.Store, error) {
absPaths := git.MakeAbsolutePaths(r, d.GitConfig.Paths)
if d.ManifestGenerationEnabled {
return manifests.NewConfigAware(r.Dir(), absPaths, d.Manifests)
return manifests.NewConfigAware(r.Dir(), absPaths, d.Manifests, d.SyncTimeout)
}
return manifests.NewRawFiles(r.Dir(), absPaths, d.Manifests), nil
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/manifests/configaware.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/fluxcd/flux/pkg/image"
"github.com/fluxcd/flux/pkg/resource"
Expand All @@ -31,12 +32,15 @@ type configAware struct {
// have a set of resources
mu sync.RWMutex
resourcesByID map[string]resourceWithOrigin

// default command timeout
defaultTimeout time.Duration
}

// NewConfigAware constructs a `Store` that processes in-repo config
// files (`.flux.yaml`) where present, and otherwise looks for "raw"
// YAML files.
func NewConfigAware(baseDir string, targetPaths []string, manifests Manifests) (*configAware, error) {
func NewConfigAware(baseDir string, targetPaths []string, manifests Manifests, syncTimeout time.Duration) (*configAware, error) {
configFiles, rawManifestDirs, err := splitConfigFilesAndRawManifestPaths(baseDir, targetPaths)
if err != nil {
return nil, err
Expand All @@ -48,9 +52,10 @@ func NewConfigAware(baseDir string, targetPaths []string, manifests Manifests) (
baseDir: baseDir,
paths: rawManifestDirs,
},
manifests: manifests,
baseDir: baseDir,
configFiles: configFiles,
manifests: manifests,
baseDir: baseDir,
configFiles: configFiles,
defaultTimeout: syncTimeout,
}
return result, nil
}
Expand Down Expand Up @@ -146,7 +151,7 @@ func (ca *configAware) SetWorkloadContainerImage(ctx context.Context, resourceID
if err := ca.rawFiles.setManifestWorkloadContainerImage(resWithOrigin.resource, container, newImageID); err != nil {
return err
}
} else if err := resWithOrigin.configFile.SetWorkloadContainerImage(ctx, ca.manifests, resWithOrigin.resource, container, newImageID); err != nil {
} else if err := resWithOrigin.configFile.SetWorkloadContainerImage(ctx, ca.manifests, resWithOrigin.resource, container, newImageID, ca.defaultTimeout); err != nil {
return err
}
// Reset resources, since we have modified one
Expand All @@ -168,7 +173,7 @@ func (ca *configAware) UpdateWorkloadPolicies(ctx context.Context, resourceID re
changed, err = ca.rawFiles.updateManifestWorkloadPolicies(resWithOrigin.resource, update)
} else {
cf := resWithOrigin.configFile
changed, err = cf.UpdateWorkloadPolicies(ctx, ca.manifests, resWithOrigin.resource, update)
changed, err = cf.UpdateWorkloadPolicies(ctx, ca.manifests, resWithOrigin.resource, update, ca.defaultTimeout)
}
if err != nil {
return false, err
Expand Down Expand Up @@ -210,7 +215,7 @@ func (ca *configAware) getResourcesByID(ctx context.Context) (map[string]resourc
}

for _, cf := range ca.configFiles {
resourceManifests, err := cf.GenerateManifests(ctx, ca.manifests)
resourceManifests, err := cf.GenerateManifests(ctx, ca.manifests, ca.defaultTimeout)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/manifests/configaware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -53,7 +54,7 @@ func setup(t *testing.T, paths []string, configs ...config) (*configAware, strin
ioutil.WriteFile(filepath.Join(baseDir, p, ConfigFilename), []byte(c.fluxyaml), 0600)
}
}
frs, err := NewConfigAware(baseDir, searchPaths, manifests)
frs, err := NewConfigAware(baseDir, searchPaths, manifests, time.Minute)
assert.NoError(t, err)
return frs, baseDir, cleanup
}
Expand Down
104 changes: 53 additions & 51 deletions pkg/manifests/configfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,27 @@ import (
"github.com/ghodss/yaml"
"github.com/pkg/errors"
jsonschema "github.com/xeipuuv/gojsonschema"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/fluxcd/flux/pkg/image"
"github.com/fluxcd/flux/pkg/resource"
)

const (
ConfigFilename = ".flux.yaml"
CommandTimeout = time.Minute
)

// This is easier to read as YAML, trust me.
const configSchemaYAML = `
"$schema": http://json-schema.org/draft-07/schema#
definitions:
command:
type: object
required: ['command']
type: object
properties:
command: { type: string }
timeout: { type: string }
additionalProperties: false
version: { const: 1 }
type: object
oneOf:
Expand Down Expand Up @@ -110,39 +114,29 @@ type ConfigFile struct {
// CommandUpdated represents a config in which updates are done by
// execing commands as given.
type CommandUpdated struct {
Generators []Generator `json:"generators"`
Updaters []Updater `json:"updaters,omitempty"`
Generators []Command `json:"generators"`
Updaters []Updater `json:"updaters,omitempty"`
}

// Generator is an individual command for generating manifests.
type Generator struct {
Command string `json:"command,omitempty"`
// Command is an individual command and timeout for generating manifests.
type Command struct {
marshallford marked this conversation as resolved.
Show resolved Hide resolved
Command string `json:"command,omitempty"`
Timeout *metav1.Duration `json:"timeout,omitempty"`
}

// Updater gives a means for updating image refs and a means for
// updating policy in a manifest.
type Updater struct {
ContainerImage ContainerImageUpdater `json:"containerImage,omitempty"`
Policy PolicyUpdater `json:"policy,omitempty"`
}

// ContainerImageUpdater is a command for updating the image used by a
// container, in a manifest.
type ContainerImageUpdater struct {
Command string `json:"command,omitempty"`
}

// PolicyUpdater is a command for updating a policy for a manifest.
type PolicyUpdater struct {
Command string `json:"command,omitempty"`
ContainerImage Command `json:"containerImage,omitempty"`
Policy Command `json:"policy,omitempty"`
}

// PatchUpdated represents a config in which updates are done by
// maintaining a patch, which is calculating from, and applied to, the
// generated manifests.
type PatchUpdated struct {
Generators []Generator `json:"generators"`
PatchFile string `json:"patchFile,omitempty"`
Generators []Command `json:"generators"`
PatchFile string `json:"patchFile,omitempty"`
generatorsResultCache []byte
}

Expand Down Expand Up @@ -229,23 +223,23 @@ func (cf *ConfigFile) ConfigRelativeToWorkingDir() string {

// GenerateManifests returns the manifests generated (and patched, if
// necessary) according to the config file.
func (cf *ConfigFile) GenerateManifests(ctx context.Context, manifests Manifests) ([]byte, error) {
func (cf *ConfigFile) GenerateManifests(ctx context.Context, manifests Manifests, defaultTimeout time.Duration) ([]byte, error) {
if cf.PatchUpdated != nil {
_, finalBytes, _, err := cf.getGeneratedAndPatchedManifests(ctx, manifests)
_, finalBytes, _, err := cf.getGeneratedAndPatchedManifests(ctx, manifests, defaultTimeout)
return finalBytes, err
}
return cf.getGeneratedManifests(ctx, manifests, cf.CommandUpdated.Generators)
return cf.getGeneratedManifests(ctx, manifests, cf.CommandUpdated.Generators, defaultTimeout)
}

func (cf *ConfigFile) SetWorkloadContainerImage(ctx context.Context, manifests Manifests, r resource.Resource, container string, newImageID image.Ref) error {
func (cf *ConfigFile) SetWorkloadContainerImage(ctx context.Context, manifests Manifests, r resource.Resource, container string, newImageID image.Ref, defaultTimeout time.Duration) error {
if cf.PatchUpdated != nil {
return cf.updatePatchFile(ctx, manifests, func(previousManifests []byte) ([]byte, error) {
return manifests.SetWorkloadContainerImage(previousManifests, r.ResourceID(), container, newImageID)
})
}, defaultTimeout)
}

// Command-updated
result := cf.execContainerImageUpdaters(ctx, r.ResourceID(), container, newImageID.Name.String(), newImageID.Tag)
result := cf.execContainerImageUpdaters(ctx, r.ResourceID(), container, newImageID.Name.String(), newImageID.Tag, defaultTimeout)
if len(result) == 0 {
return makeNoCommandsRunErr("update.containerImage", cf)
}
Expand All @@ -264,7 +258,7 @@ func (cf *ConfigFile) SetWorkloadContainerImage(ctx context.Context, manifests M

// UpdateWorkloadPolicies updates policies for a workload, using
// commands or patching according to the config file.
func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Manifests, r resource.Resource, update resource.PolicyUpdate) (bool, error) {
func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Manifests, r resource.Resource, update resource.PolicyUpdate, defaultTimeout time.Duration) (bool, error) {
if cf.PatchUpdated != nil {
var changed bool
err := cf.updatePatchFile(ctx, manifests, func(previousManifests []byte) ([]byte, error) {
Expand All @@ -273,7 +267,7 @@ func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Mani
changed = bytes.Compare(previousManifests, updatedManifests) != 0
}
return updatedManifests, err
})
}, defaultTimeout)
return changed, err
}

Expand All @@ -288,7 +282,7 @@ func (cf *ConfigFile) UpdateWorkloadPolicies(ctx context.Context, manifests Mani
}

for key, value := range changes {
result := cf.execPolicyUpdaters(ctx, r.ResourceID(), key, value)
result := cf.execPolicyUpdaters(ctx, r.ResourceID(), key, value, defaultTimeout)
if len(result) == 0 {
return false, makeNoCommandsRunErr("updaters.policy", cf)
}
Expand Down Expand Up @@ -324,11 +318,11 @@ type ConfigFileCombinedExecResult struct {

// getGeneratedAndPatchedManifests is used to generate manifests when
// the config is patchUpdated.
func (cf *ConfigFile) getGeneratedAndPatchedManifests(ctx context.Context, manifests Manifests) ([]byte, []byte, string, error) {
func (cf *ConfigFile) getGeneratedAndPatchedManifests(ctx context.Context, manifests Manifests, defaultTimeout time.Duration) ([]byte, []byte, string, error) {
generatedManifests := cf.PatchUpdated.generatorsResultCache
if generatedManifests == nil {
var err error
generatedManifests, err = cf.getGeneratedManifests(ctx, manifests, cf.PatchUpdated.Generators)
generatedManifests, err = cf.getGeneratedManifests(ctx, manifests, cf.PatchUpdated.Generators, defaultTimeout)
if err != nil {
return nil, nil, "", err
}
Expand Down Expand Up @@ -364,9 +358,9 @@ func (cf *ConfigFile) getGeneratedAndPatchedManifests(ctx context.Context, manif
// getGeneratedManifests is used to produce the manifests based _only_
// on the generators in the config. This is sufficient for
// commandUpdated config, and the first step for patchUpdated config.
func (cf *ConfigFile) getGeneratedManifests(ctx context.Context, manifests Manifests, generators []Generator) ([]byte, error) {
func (cf *ConfigFile) getGeneratedManifests(ctx context.Context, manifests Manifests, generators []Command, defaultTimeout time.Duration) ([]byte, error) {
buf := bytes.NewBuffer(nil)
for i, cmdResult := range cf.execGenerators(ctx, generators) {
for i, cmdResult := range cf.execGenerators(ctx, generators, defaultTimeout) {
if cmdResult.Error != nil {
err := fmt.Errorf("error executing generator command %q from file %q: %s\nerror output:\n%s\ngenerated output:\n%s",
generators[i].Command,
Expand All @@ -386,8 +380,8 @@ func (cf *ConfigFile) getGeneratedManifests(ctx context.Context, manifests Manif

// updatePatchFile calculates the patch given a transformation, and
// updates the patch file given in the config.
func (cf *ConfigFile) updatePatchFile(ctx context.Context, manifests Manifests, updateFn func(previousManifests []byte) ([]byte, error)) error {
generatedManifests, patchedManifests, patchFilePath, err := cf.getGeneratedAndPatchedManifests(ctx, manifests)
func (cf *ConfigFile) updatePatchFile(ctx context.Context, manifests Manifests, updateFn func(previousManifests []byte) ([]byte, error), defaultTimeout time.Duration) error {
generatedManifests, patchedManifests, patchFilePath, err := cf.getGeneratedAndPatchedManifests(ctx, manifests, defaultTimeout)
if err != nil {
return fmt.Errorf("error parsing generated, patched output from file %s: %s", cf.configPathRelative, err)
}
Expand All @@ -404,12 +398,16 @@ func (cf *ConfigFile) updatePatchFile(ctx context.Context, manifests Manifests,

// execGenerators executes all the generators given and returns the
// results; it will stop at the first failing command.
func (cf *ConfigFile) execGenerators(ctx context.Context, generators []Generator) []ConfigFileExecResult {
func (cf *ConfigFile) execGenerators(ctx context.Context, generators []Command, defaultTimeout time.Duration) []ConfigFileExecResult {
result := []ConfigFileExecResult{}
for _, g := range generators {
stdErr := bytes.NewBuffer(nil)
stdOut := bytes.NewBuffer(nil)
err := cf.execCommand(ctx, nil, stdOut, stdErr, g.Command)
timeout := defaultTimeout
if g.Timeout != nil && g.Timeout.Duration >= time.Second {
timeout = g.Timeout.Duration
}
err := cf.execCommand(ctx, nil, stdOut, stdErr, g.Command, timeout)
r := ConfigFileExecResult{
Stdout: stdOut.Bytes(),
Stderr: stdErr.Bytes(),
Expand All @@ -427,52 +425,56 @@ func (cf *ConfigFile) execGenerators(ctx context.Context, generators []Generator
// execContainerImageUpdaters executes all the image updates in the configuration file.
// It will stop at the first error, in which case the returned error will be non-nil
func (cf *ConfigFile) execContainerImageUpdaters(ctx context.Context,
workload resource.ID, container string, image, imageTag string) []ConfigFileCombinedExecResult {
workload resource.ID, container string, image, imageTag string, defaultTimeout time.Duration) []ConfigFileCombinedExecResult {
env := makeEnvFromResourceID(workload)
env = append(env,
"FLUX_CONTAINER="+container,
"FLUX_IMG="+image,
"FLUX_TAG="+imageTag,
)
commands := []string{}
commands := []Command{}
var updaters []Updater
if cf.CommandUpdated != nil {
updaters = cf.CommandUpdated.Updaters
}
for _, u := range updaters {
commands = append(commands, u.ContainerImage.Command)
commands = append(commands, u.ContainerImage)
}
return cf.execCommandsWithCombinedOutput(ctx, env, commands)
return cf.execCommandsWithCombinedOutput(ctx, env, commands, defaultTimeout)
}

// execPolicyUpdaters executes all the policy update commands given in
// the configuration file. An empty policyValue means remove the
// policy. It will stop at the first error, in which case the returned
// error will be non-nil
func (cf *ConfigFile) execPolicyUpdaters(ctx context.Context,
workload resource.ID, policyName, policyValue string) []ConfigFileCombinedExecResult {
workload resource.ID, policyName, policyValue string, defaultTimeout time.Duration) []ConfigFileCombinedExecResult {
env := makeEnvFromResourceID(workload)
env = append(env, "FLUX_POLICY="+policyName)
if policyValue != "" {
env = append(env, "FLUX_POLICY_VALUE="+policyValue)
}
commands := []string{}
commands := []Command{}
var updaters []Updater
if cf.CommandUpdated != nil {
updaters = cf.CommandUpdated.Updaters
}
for _, u := range updaters {
commands = append(commands, u.Policy.Command)
commands = append(commands, u.Policy)
}
return cf.execCommandsWithCombinedOutput(ctx, env, commands)
return cf.execCommandsWithCombinedOutput(ctx, env, commands, defaultTimeout)
}

func (cf *ConfigFile) execCommandsWithCombinedOutput(ctx context.Context, env []string, commands []string) []ConfigFileCombinedExecResult {
func (cf *ConfigFile) execCommandsWithCombinedOutput(ctx context.Context, env []string, commands []Command, defaultTimeout time.Duration) []ConfigFileCombinedExecResult {
env = append(env, "PATH="+os.Getenv("PATH"))
result := []ConfigFileCombinedExecResult{}
for _, c := range commands {
stdOutAndErr := bytes.NewBuffer(nil)
err := cf.execCommand(ctx, env, stdOutAndErr, stdOutAndErr, c)
timeout := defaultTimeout
if c.Timeout != nil && c.Timeout.Duration >= time.Second {
timeout = c.Timeout.Duration
}
err := cf.execCommand(ctx, env, stdOutAndErr, stdOutAndErr, c.Command, timeout)
r := ConfigFileCombinedExecResult{
Output: stdOutAndErr.Bytes(),
Error: err,
Expand All @@ -486,8 +488,8 @@ func (cf *ConfigFile) execCommandsWithCombinedOutput(ctx context.Context, env []
return result
}

func (cf *ConfigFile) execCommand(ctx context.Context, env []string, stdOut, stdErr io.Writer, command string) error {
cmdCtx, cancel := context.WithTimeout(ctx, CommandTimeout)
func (cf *ConfigFile) execCommand(ctx context.Context, env []string, stdOut, stdErr io.Writer, command string, timeout time.Duration) error {
cmdCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
cmd := exec.CommandContext(ctx, "/bin/sh", "-c", command)
cmd.Env = env
Expand Down
Loading