diff --git a/.k8s-image-swapper.yml b/.k8s-image-swapper.yml index 16c3f1df..0724790c 100644 --- a/.k8s-image-swapper.yml +++ b/.k8s-image-swapper.yml @@ -3,6 +3,20 @@ dryRun: true logLevel: trace logFormat: console +# imageSwapPolicy defines the mutation strategy used by the webhook. +# - always: Will always swap the image regardless of the image existence in the target registry. +# This can result in pods ending in state ImagePullBack if images fail to be copied to the target registry. +# - exists: Only swaps the image if it exits in the target registry. +# This can result in pods pulling images from the source registry, e.g. the first pod pulls +# from source registry, subsequent pods pull from target registry. +imageSwapPolicy: exists + +# imageCopyPolicy defines the image copy strategy used by the webhook. +# - delayed: Submits the copy job to a process queue and moves on. +# - immediate: Submits the copy job to a process queue and waits for it to finish (deadline 8s). +# - force: Attempts to immediately copy the image (deadline 8s). +imagePullPolicy: delayed + source: # Filters provide control over what pods will be processed. # By default all pods will be processed. If a condition matches, the pod will NOT be processed. @@ -28,7 +42,9 @@ target: accountId: 123456789 region: ap-southeast-2 ecrOptions: - tags: [] + tags: + - key: CreatedBy + value: k8s-image-swapper imageTagMutability: MUTABLE imageScanningConfiguration: imageScanOnPush: true diff --git a/cmd/root.go b/cmd/root.go index 35b2c2d4..4073f500 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -31,8 +31,9 @@ import ( "os" - "github.com/estahn/k8s-image-swapper/pkg" + "github.com/estahn/k8s-image-swapper/pkg/config" "github.com/estahn/k8s-image-swapper/pkg/registry" + "github.com/estahn/k8s-image-swapper/pkg/types" "github.com/estahn/k8s-image-swapper/pkg/webhook" homedir "github.com/mitchellh/go-homedir" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -44,7 +45,7 @@ import ( ) var cfgFile string -var cfg *pkg.Config = &pkg.Config{} +var cfg *config.Config = &config.Config{} // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ @@ -66,7 +67,17 @@ A mutating webhook for Kubernetes, pointing the images to a new location.`, os.Exit(1) } - wh, err := webhook.NewImageSwapperWebhook(rClient, cfg.Source.Filters) + imageSwapPolicy, err := types.ParseImageSwapPolicy(cfg.ImageSwapPolicy) + if err != nil { + log.Err(err) + } + + imageCopyPolicy, err := types.ParseImageCopyPolicy(cfg.ImageCopyPolicy) + if err != nil { + log.Err(err) + } + + wh, err := webhook.NewImageSwapperWebhook(rClient, cfg.Source.Filters, imageSwapPolicy, imageCopyPolicy) if err != nil { log.Err(err).Msg("error creating webhook") os.Exit(1) diff --git a/docs/configuration.md b/docs/configuration.md index 2aa44762..77c319fb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -23,6 +23,26 @@ The option `logLevel` & `logFormat` allow to adjust the verbosity and format (e. logFormat: console ``` +## ImageSwapPolicy + +The option `imageSwapPolicy` (default: `exists`) defines the mutation strategy used. + +* `always`: Will always swap the image regardless of the image existence in the target registry. + This can result in pods ending in state ImagePullBack if images fail to be copied to the target registry. +* `exists`: Only swaps the image if it exits in the target registry. + This can result in pods pulling images from the source registry, e.g. the first pod pulls + from source registry, subsequent pods pull from target registry. + +## ImageCopyPolicy + +The option `imageCopyPolicy` (default: `delayed`) defines the image copy strategy used. + +* `delayed`: Submits the copy job to a process queue and moves on. +* `immediate`: Submits the copy job to a process queue and waits for it to finish (deadline 8s). +* `force`: Attempts to immediately copy the image (deadline 8s). + + + ## Source This section configures details about the image source. @@ -95,7 +115,7 @@ Below you will find a list of common queries and/or ideas: ```yaml source: filters: - - jmespath: "contains(container.image, `.dkr.ecr.`) && contains(container.image, `.amazonaws.com`)" + - jmespath: "contains(container.image, '.dkr.ecr.') && contains(container.image, '.amazonaws.com')" ``` `k8s-image-swapper` will log the filter data and result in `debug` mode. diff --git a/go.mod b/go.mod index 7334b5c7..9ced9c69 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/spf13/cobra v1.1.1 github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.6.1 + gopkg.in/yaml.v2 v2.3.0 k8s.io/api v0.19.3 k8s.io/apimachinery v0.20.0 ) diff --git a/pkg/config.go b/pkg/config.go deleted file mode 100644 index 79c28270..00000000 --- a/pkg/config.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright © 2020 Enrico Stahn - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -*/ -package pkg - -import "fmt" - -type Config struct { - LogLevel string `yaml:"logFormat" validate:"oneof=trace debug info warn error fatal"` - LogFormat string `yaml:"logFormat" validate:"oneof=json console"` - - ListenAddress string - - DryRun bool `yaml:"dryRun"` - Source Source `yaml:"source"` - Target Target `yaml:"target"` - - TLSCertFile string - TLSKeyFile string -} - -type Source struct { - Filters []JMESPathFilter `yaml:"filters"` -} - -type JMESPathFilter struct { - JMESPath string `yaml:"jmespath"` -} - -type Target struct { - AWS AWS `yaml:"aws"` -} - -type AWS struct { - AccountID string `yaml:"accountId"` - Region string `yaml:"region"` -} - -func (a *AWS) EcrDomain() string { - return fmt.Sprintf("%s.dkr.ecr.%s.amazonaws.com", a.AccountID, a.Region) -} diff --git a/pkg/registry/client.go b/pkg/registry/client.go index a7cfb777..bec081e5 100644 --- a/pkg/registry/client.go +++ b/pkg/registry/client.go @@ -7,7 +7,7 @@ type Client interface { CopyImage() error PullImage() error PutImage() error - ImageExists() bool + ImageExists(ref string) bool // Endpoint returns the domain of the registry Endpoint() string diff --git a/pkg/registry/ecr.go b/pkg/registry/ecr.go index e0252511..ed9d0091 100644 --- a/pkg/registry/ecr.go +++ b/pkg/registry/ecr.go @@ -2,6 +2,7 @@ package registry import ( "encoding/base64" + "os/exec" "time" "github.com/aws/aws-sdk-go/aws" @@ -32,6 +33,16 @@ func (e *ECRClient) CreateRepository(name string) error { _, err := e.client.CreateRepository(&ecr.CreateRepositoryInput{ RepositoryName: aws.String(name), + ImageScanningConfiguration: &ecr.ImageScanningConfiguration{ + ScanOnPush: aws.Bool(true), + }, + ImageTagMutability: aws.String(ecr.ImageTagMutabilityMutable), + Tags: []*ecr.Tag{ + { + Key: aws.String("CreatedBy"), + Value: aws.String("k8s-image-swapper"), + }, + }, }) if err != nil { if aerr, ok := err.(awserr.Error); ok { @@ -69,8 +80,29 @@ func (e *ECRClient) PutImage() error { panic("implement me") } -func (e *ECRClient) ImageExists() bool { - panic("implement me") +func (e *ECRClient) ImageExists(ref string) bool { + if _, found := e.cache.Get(ref); found { + return true + } + + app := "skopeo" + args := []string{ + "inspect", + "--retry-times", "3", + "docker://" + ref, + "--creds", e.Credentials(), + } + + log.Trace().Str("app", app).Strs("args", args).Msg("executing command to inspect image") + cmd := exec.Command(app, args...) + + if _, err := cmd.Output(); err != nil { + return false + } + + e.cache.Set(ref, "", 1) + + return true } func (e *ECRClient) Endpoint() string { diff --git a/pkg/types/types.go b/pkg/types/types.go new file mode 100644 index 00000000..59e685e0 --- /dev/null +++ b/pkg/types/types.go @@ -0,0 +1,48 @@ +package types + +import "fmt" + +type ImageSwapPolicy int + +const ( + ImageSwapPolicyAlways = iota + ImageSwapPolicyExists +) + +func (p ImageSwapPolicy) String() string { + return [...]string{"always", "exists"}[p] +} + +func ParseImageSwapPolicy(p string) (ImageSwapPolicy, error) { + switch p { + case ImageSwapPolicy(ImageSwapPolicyAlways).String(): + return ImageSwapPolicyAlways, nil + case ImageSwapPolicy(ImageSwapPolicyExists).String(): + return ImageSwapPolicyExists, nil + } + return ImageSwapPolicyExists, fmt.Errorf("unknown image swap policy string: '%s', defaulting to exists", p) +} + +type ImageCopyPolicy int + +const ( + ImageCopyPolicyDelayed = iota + ImageCopyPolicyImmediate + ImageCopyPolicyForce +) + +func (p ImageCopyPolicy) String() string { + return [...]string{"delayed", "immediate", "force"}[p] +} + +func ParseImageCopyPolicy(p string) (ImageCopyPolicy, error) { + switch p { + case ImageCopyPolicy(ImageCopyPolicyDelayed).String(): + return ImageCopyPolicyDelayed, nil + case ImageCopyPolicy(ImageCopyPolicyImmediate).String(): + return ImageCopyPolicyImmediate, nil + case ImageCopyPolicy(ImageCopyPolicyForce).String(): + return ImageCopyPolicyForce, nil + } + return ImageCopyPolicyDelayed, fmt.Errorf("unknown image copy policy string: '%s', defaulting to delayed", p) +} \ No newline at end of file diff --git a/pkg/webhook/image_swapper.go b/pkg/webhook/image_swapper.go index 2e1da462..cc614725 100644 --- a/pkg/webhook/image_swapper.go +++ b/pkg/webhook/image_swapper.go @@ -8,9 +8,10 @@ import ( "github.com/alitto/pond" "github.com/containers/image/v5/transports/alltransports" - "github.com/containers/image/v5/types" - "github.com/estahn/k8s-image-swapper/pkg" + ctypes "github.com/containers/image/v5/types" + "github.com/estahn/k8s-image-swapper/pkg/config" "github.com/estahn/k8s-image-swapper/pkg/registry" + types "github.com/estahn/k8s-image-swapper/pkg/types" "github.com/jmespath/go-jmespath" "github.com/rs/zerolog/log" "github.com/slok/kubewebhook/pkg/webhook" @@ -29,23 +30,28 @@ type ImageSwapper struct { // filters defines a list of expressions to remove objects that should not be processed, // by default all objects will be processed - filters []pkg.JMESPathFilter + filters []config.JMESPathFilter - // downloader manages the download pool - downloader *pond.WorkerPool + // copier manages the jobs copying the images to the target registry + copier *pond.WorkerPool + + imageSwapPolicy types.ImageSwapPolicy + imageCopyPolicy types.ImageCopyPolicy } // NewImageSwapper returns a new ImageSwapper initialized. -func NewImageSwapper(registryClient registry.Client, filters []pkg.JMESPathFilter) mutating.Mutator { +func NewImageSwapper(registryClient registry.Client, filters []config.JMESPathFilter, imageSwapPolicy types.ImageSwapPolicy, imageCopyPolicy types.ImageCopyPolicy) mutating.Mutator { return &ImageSwapper{ registryClient: registryClient, - filters: filters, - downloader: pond.New(100, 1000), + filters: filters, + copier: pond.New(100, 1000), + imageSwapPolicy: imageSwapPolicy, + imageCopyPolicy: imageCopyPolicy, } } -func NewImageSwapperWebhook(registryClient registry.Client, filters []pkg.JMESPathFilter) (webhook.Webhook, error) { - imageSwapper := NewImageSwapper(registryClient, filters) +func NewImageSwapperWebhook(registryClient registry.Client, filters []config.JMESPathFilter, imageSwapPolicy types.ImageSwapPolicy, imageCopyPolicy types.ImageCopyPolicy) (webhook.Webhook, error) { + imageSwapper := NewImageSwapper(registryClient, filters, imageSwapPolicy, imageCopyPolicy) mt := mutating.MutatorFunc(imageSwapper.Mutate) mcfg := mutating.WebhookConfig{ Name: "k8s-image-swapper", @@ -87,10 +93,10 @@ func (p *ImageSwapper) Mutate(ctx context.Context, obj metav1.Object) (bool, err lctx := logger. WithContext(ctx) - for i := range pod.Spec.Containers { - srcRef, err := alltransports.ParseImageName("docker://" + pod.Spec.Containers[i].Image) + for i, container := range pod.Spec.Containers { + srcRef, err := alltransports.ParseImageName("docker://" + container.Image) if err != nil { - log.Ctx(lctx).Warn().Msgf("invalid source name %s: %v", pod.Spec.Containers[i].Image, err) + log.Ctx(lctx).Warn().Msgf("invalid source name %s: %v", container.Image, err) continue } @@ -99,37 +105,69 @@ func (p *ImageSwapper) Mutate(ctx context.Context, obj metav1.Object) (bool, err continue } - filterCtx := NewFilterContext(*ar, pod, pod.Spec.Containers[i]) + filterCtx := NewFilterContext(*ar, pod, container) if filterMatch(filterCtx, p.filters) { - log.Ctx(lctx).Info().Msg("skip due to filter condition") + log.Ctx(lctx).Debug().Msg("skip due to filter condition") continue } targetImage := p.targetName(srcRef) - log.Ctx(lctx).Debug().Str("image", targetImage).Msg("set new container image") - pod.Spec.Containers[i].Image = targetImage + copyFn := func() { + // Avoid unnecessary copying by ending early. For images such as :latest we adhere to the + // image pull policy. + if p.registryClient.ImageExists(targetImage) && container.ImagePullPolicy != corev1.PullAlways { + return + } - // Create repository - createRepoName := reference.TrimNamed(srcRef.DockerReference()).String() - log.Ctx(lctx).Debug().Str("repository", createRepoName).Msg("create repository") - if err := p.registryClient.CreateRepository(createRepoName); err != nil { - log.Err(err) - } + // Create repository + createRepoName := reference.TrimNamed(srcRef.DockerReference()).String() + log.Ctx(lctx).Debug().Str("repository", createRepoName).Msg("create repository") + if err := p.registryClient.CreateRepository(createRepoName); err != nil { + log.Err(err) + } - p.downloader.Submit(func() { + // Copy image log.Ctx(lctx).Trace().Str("source", srcRef.DockerReference().String()).Str("target", targetImage).Msg("copy image") if err := copyImage(srcRef.DockerReference().String(), "", targetImage, p.registryClient.Credentials()); err != nil { log.Ctx(lctx).Err(err).Str("source", srcRef.DockerReference().String()).Str("target", targetImage).Msg("copying image to target registry failed") } - }) + } + + // imageCopyPolicy + switch p.imageCopyPolicy { + case types.ImageCopyPolicyDelayed: + p.copier.Submit(copyFn) + case types.ImageCopyPolicyImmediate: + // TODO: Implement deadline + p.copier.SubmitAndWait(copyFn) + case types.ImageCopyPolicyForce: + // TODO: Implement deadline + copyFn() + default: + panic("unknown imageCopyPolicy") + } + + // imageSwapPolicy + switch p.imageSwapPolicy { + case types.ImageSwapPolicyAlways: + log.Ctx(lctx).Debug().Str("image", targetImage).Msg("set new container image") + pod.Spec.Containers[i].Image = targetImage + case types.ImageSwapPolicyExists: + if p.registryClient.ImageExists(targetImage) { + log.Ctx(lctx).Debug().Str("image", targetImage).Msg("set new container image") + pod.Spec.Containers[i].Image = targetImage + } + default: + panic("unknown imageSwapPolicy") + } } return false, nil } // filterMatch returns true if one of the filters matches the context -func filterMatch(ctx FilterContext, filters []pkg.JMESPathFilter) bool { +func filterMatch(ctx FilterContext, filters []config.JMESPathFilter) bool { // Simplify FilterContext to be easier searchable by marshaling it to JSON and back to an interface var filterContext interface{} jsonBlob, err := json.Marshal(ctx) @@ -169,7 +207,7 @@ func filterMatch(ctx FilterContext, filters []pkg.JMESPathFilter) bool { } // targetName returns the reference in the target repository -func (p *ImageSwapper) targetName(ref types.ImageReference) string { +func (p *ImageSwapper) targetName(ref ctypes.ImageReference) string { return fmt.Sprintf("%s/%s", p.registryClient.Endpoint(), ref.DockerReference().String()) } @@ -195,15 +233,21 @@ func copyImage(src string, srcCeds string, dest string, destCreds string) error args := []string{ "--override-os", "linux", "copy", + "--retry-times", "3", "docker://" + src, "docker://" + dest, "--src-no-creds", "--dest-creds", destCreds, } - log.Trace().Str("app", app).Strs("args", args).Msg("executing command to copy image") cmd := exec.Command(app, args...) - _, err := cmd.Output() + output, err := cmd.CombinedOutput() + + log.Trace(). + Str("app", app). + Strs("args", args). + Bytes("output", output). + Msg("executed command to copy image") return err } diff --git a/pkg/webhook/image_swapper_test.go b/pkg/webhook/image_swapper_test.go index 67a5c3bf..2be3ed52 100644 --- a/pkg/webhook/image_swapper_test.go +++ b/pkg/webhook/image_swapper_test.go @@ -3,7 +3,7 @@ package webhook import ( "testing" - "github.com/estahn/k8s-image-swapper/pkg" + "github.com/estahn/k8s-image-swapper/pkg/config" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -166,8 +166,13 @@ func TestFilterMatch(t *testing.T) { }, } - assert.True(t, filterMatch(filterContext, []pkg.JMESPathFilter{{JMESPath: "obj.metadata.namespace == 'kube-system'"}})) - assert.False(t, filterMatch(filterContext, []pkg.JMESPathFilter{{JMESPath: "obj.metadata.namespace != 'kube-system'"}})) - assert.False(t, filterMatch(filterContext, []pkg.JMESPathFilter{{JMESPath: "obj"}})) - assert.True(t, filterMatch(filterContext, []pkg.JMESPathFilter{{JMESPath: "container.name == 'nginx'"}})) + assert.True(t, filterMatch(filterContext, []config.JMESPathFilter{{JMESPath: "obj.metadata.namespace == 'kube-system'"}})) + assert.False(t, filterMatch(filterContext, []config.JMESPathFilter{{JMESPath: "obj.metadata.namespace != 'kube-system'"}})) + assert.False(t, filterMatch(filterContext, []config.JMESPathFilter{{JMESPath: "obj"}})) + assert.True(t, filterMatch(filterContext, []config.JMESPathFilter{{JMESPath: "container.name == 'nginx'"}})) + // false syntax test + assert.False(t, filterMatch(filterContext, []config.JMESPathFilter{{JMESPath: "."}})) + // non-boolean value + assert.False(t, filterMatch(filterContext, []config.JMESPathFilter{{JMESPath: "obj"}})) + assert.False(t, filterMatch(filterContext, []config.JMESPathFilter{{JMESPath: "contains(container.image, '.dkr.ecr.') && contains(container.image, '.amazonaws.com')"}})) }