Skip to content

Commit

Permalink
Patch CRDs with origin labels
Browse files Browse the repository at this point in the history
This allows the applied CRDs to be traced using the same labels as
currently applied to resources using a Kustomize post-render.

Kustomize is not used here as the apply logic for CRDs is different
from the approach used during releasing, where we inject the labels
in such a way that they are written back to the Helm storage in the
rendered manifest. This to match Helm's logic from which our present
code is already derived (buth with support for policies).

This also moves the full responsibility of dealing with the install
of CRDs to ourselves, as we no longer fall back to Helm's logic when
`Create` is configured as a policy during a Helm install. As this
would not allow us to add the labels.

Signed-off-by: Hidde Beydals <[email protected]>
  • Loading branch information
hiddeco committed Jan 31, 2023
1 parent 1e49ba6 commit c76c2c5
Showing 1 changed file with 103 additions and 53 deletions.
156 changes: 103 additions & 53 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ import (

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

v2 "github.com/fluxcd/helm-controller/api/v2beta1"
)

var accessor = meta.NewAccessor()

type ActionError struct {
Err error
CapturedLogs string
Expand Down Expand Up @@ -95,7 +98,7 @@ func postRenderers(hr v2.HelmRelease) (postrender.PostRenderer, error) {
return &combinedRenderer, nil
}

// Install runs an Helm install action for the given v2beta1.HelmRelease.
// Install runs a Helm install action for the given v2beta1.HelmRelease.
func (r *Runner) Install(hr v2.HelmRelease, chart *chart.Chart, values chartutil.Values) (*release.Release, error) {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -110,33 +113,31 @@ func (r *Runner) Install(hr v2.HelmRelease, chart *chart.Chart, values chartutil
install.DisableHooks = hr.Spec.GetInstall().DisableHooks
install.DisableOpenAPIValidation = hr.Spec.GetInstall().DisableOpenAPIValidation
install.Replace = hr.Spec.GetInstall().Replace
var legacyCRDsPolicy = v2.Create
if hr.Spec.GetInstall().SkipCRDs {
legacyCRDsPolicy = v2.Skip
install.SkipCRDs = true
install.Devel = true

if hr.Spec.TargetNamespace != "" {
install.CreateNamespace = hr.Spec.GetInstall().CreateNamespace
}
cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetInstall().CRDs, legacyCRDsPolicy)

renderer, err := postRenderers(hr)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy == v2.Skip || cRDsPolicy == v2.CreateReplace {
install.SkipCRDs = true
install.PostRenderer = renderer

// If user opted-in to install (or replace) CRDs, install them first.
var legacyCRDsPolicy = v2.Create
if hr.Spec.GetInstall().SkipCRDs {
legacyCRDsPolicy = v2.Skip
}
install.Devel = true
renderer, err := postRenderers(hr)
cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetInstall().CRDs, legacyCRDsPolicy)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
install.PostRenderer = renderer
if hr.Spec.TargetNamespace != "" {
install.CreateNamespace = hr.Spec.GetInstall().CreateNamespace
}

if cRDsPolicy == v2.CreateReplace {
crds := chart.CRDObjects()
if len(crds) > 0 {
if err := r.applyCRDs(cRDsPolicy, hr, chart); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy != v2.Skip && len(chart.CRDObjects()) > 0 {
if err := r.applyCRDs(cRDsPolicy, chart, setOriginVisitor(hr.Namespace, hr.Name)); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
}

Expand All @@ -163,24 +164,24 @@ func (r *Runner) Upgrade(hr v2.HelmRelease, chart *chart.Chart, values chartutil
upgrade.Force = hr.Spec.GetUpgrade().Force
upgrade.CleanupOnFail = hr.Spec.GetUpgrade().CleanupOnFail
upgrade.Devel = true

renderer, err := postRenderers(hr)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
upgrade.PostRenderer = renderer

// If user opted-in to upgrade CRDs, upgrade them first.
cRDsPolicy, err := r.validateCRDsPolicy(hr.Spec.GetUpgrade().CRDs, v2.Skip)
if err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy != v2.Skip {
crds := chart.CRDObjects()
if len(crds) > 0 {
if err := r.applyCRDs(cRDsPolicy, hr, chart); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
if cRDsPolicy != v2.Skip && len(chart.CRDObjects()) > 0 {
if err := r.applyCRDs(cRDsPolicy, chart, setOriginVisitor(hr.Namespace, hr.Name)); err != nil {
return nil, wrapActionErr(r.logBuffer, err)
}
}

rel, err := upgrade.Run(hr.GetReleaseName(), chart, values.AsMap())
return rel, wrapActionErr(r.logBuffer, err)
}
Expand All @@ -196,7 +197,7 @@ func (r *Runner) validateCRDsPolicy(policy v2.CRDsPolicy, defaultValue v2.CRDsPo
case v2.CreateReplace:
break
default:
return policy, fmt.Errorf("invalid CRD upgrade policy '%s' defined in field upgradeCRDs, valid values are '%s', '%s' or '%s'",
return policy, fmt.Errorf("invalid CRD policy '%s' defined in field CRDsPolicy, valid values are '%s', '%s' or '%s'",
policy, v2.Skip, v2.Create, v2.CreateReplace,
)
}
Expand All @@ -210,76 +211,83 @@ func (*rootScoped) Name() meta.RESTScopeName {
}

// This has been adapted from https://github.com/helm/helm/blob/v3.5.4/pkg/action/install.go#L127
func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart.Chart) error {
cfg := r.config
cfg.Log("apply CRDs with policy %s", policy)
func (r *Runner) applyCRDs(policy v2.CRDsPolicy, chart *chart.Chart, visitorFunc ...resource.VisitorFunc) error {
r.config.Log("apply CRDs with policy %s", policy)

// Collect all CRDs from all files in `crds` directory.
allCrds := make(kube.ResourceList, 0)
for _, obj := range chart.CRDObjects() {
// Read in the resources
res, err := cfg.KubeClient.Build(bytes.NewBuffer(obj.File.Data), false)
res, err := r.config.KubeClient.Build(bytes.NewBuffer(obj.File.Data), false)
if err != nil {
cfg.Log("failed to parse CRDs from %s: %s", obj.Name, err)
r.config.Log("failed to parse CRDs from %s: %s", obj.Name, err)
return errors.New(fmt.Sprintf("failed to parse CRDs from %s: %s", obj.Name, err))
}
allCrds = append(allCrds, res...)
}
totalItems := []*resource.Info{}

// Visit CRDs with any provided visitor functions.
for _, visitor := range visitorFunc {
if err := allCrds.Visit(visitor); err != nil {
return err
}
}

var totalItems []*resource.Info
switch policy {
case v2.Skip:
break
case v2.Create:
for i := range allCrds {
if rr, err := cfg.KubeClient.Create(allCrds[i : i+1]); err != nil {
if rr, err := r.config.KubeClient.Create(allCrds[i : i+1]); err != nil {
crdName := allCrds[i].Name
// If the error is CRD already exists, continue.
if apierrors.IsAlreadyExists(err) {
cfg.Log("CRD %s is already present. Skipping.", crdName)
r.config.Log("CRD %s is already present. Skipping.", crdName)
if rr != nil && rr.Created != nil {
totalItems = append(totalItems, rr.Created...)
}
continue
}
cfg.Log("failed to create CRD %s: %s", crdName, err)
r.config.Log("failed to create CRD %s: %s", crdName, err)
return errors.New(fmt.Sprintf("failed to create CRD %s: %s", crdName, err))
} else {
if rr != nil && rr.Created != nil {
totalItems = append(totalItems, rr.Created...)
}
}
}
break
case v2.CreateReplace:
config, err := r.config.RESTClientGetter.ToRESTConfig()
if err != nil {
r.logBuffer.Log("Error while creating Kubernetes client config: %s", err)
r.config.Log("Error while creating Kubernetes client config: %s", err)
return err
}
clientset, err := apiextension.NewForConfig(config)
if err != nil {
r.logBuffer.Log("Error while creating Kubernetes clientset for apiextension: %s", err)
r.config.Log("Error while creating Kubernetes clientset for apiextension: %s", err)
return err
}
client := clientset.ApiextensionsV1().CustomResourceDefinitions()
original := make(kube.ResourceList, 0)
// Note, we build the originals from the current set of CRDs
// and therefore this upgrade will never delete CRDs that existed in the former release
// but no longer exist in the current release.
for _, r := range allCrds {
if o, err := client.Get(context.TODO(), r.Name, v1.GetOptions{}); err == nil && o != nil {
for _, res := range allCrds {
if o, err := client.Get(context.TODO(), res.Name, metav1.GetOptions{}); err == nil && o != nil {
o.GetResourceVersion()
original = append(original, &resource.Info{
Client: clientset.ApiextensionsV1().RESTClient(),
Mapping: &meta.RESTMapping{
Resource: schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: r.Mapping.GroupVersionKind.Version,
Version: res.Mapping.GroupVersionKind.Version,
Resource: "customresourcedefinition",
},
GroupVersionKind: schema.GroupVersionKind{
Kind: "CustomResourceDefinition",
Group: "apiextensions.k8s.io",
Version: r.Mapping.GroupVersionKind.Version,
Version: res.Mapping.GroupVersionKind.Version,
},
Scope: &rootScoped{},
},
Expand All @@ -289,13 +297,13 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart
ResourceVersion: o.ObjectMeta.ResourceVersion,
})
} else if !apierrors.IsNotFound(err) {
cfg.Log("failed to get CRD %s: %s", r.Name, err)
r.config.Log("failed to get CRD %s: %s", res.Name, err)
return err
}
}
// Send them to Kube
if rr, err := cfg.KubeClient.Update(original, allCrds, true); err != nil {
cfg.Log("failed to apply CRD %s", err)
if rr, err := r.config.KubeClient.Update(original, allCrds, true); err != nil {
r.config.Log("failed to apply CRD %s", err)
return errors.New(fmt.Sprintf("failed to apply CRD %s", err))
} else {
if rr != nil {
Expand All @@ -310,21 +318,21 @@ func (r *Runner) applyCRDs(policy v2.CRDsPolicy, hr v2.HelmRelease, chart *chart
}
}
}
break
}

if len(totalItems) > 0 {
// Invalidate the local cache, since it will not have the new CRDs
// present.
discoveryClient, err := cfg.RESTClientGetter.ToDiscoveryClient()
discoveryClient, err := r.config.RESTClientGetter.ToDiscoveryClient()
if err != nil {
cfg.Log("Error in cfg.RESTClientGetter.ToDiscoveryClient(): %s", err)
r.config.Log("Error in cfg.RESTClientGetter.ToDiscoveryClient(): %s", err)
return err
}
cfg.Log("Clearing discovery cache")
r.config.Log("Clearing discovery cache")
discoveryClient.Invalidate()
// Give time for the CRD to be recognized.
if err := cfg.KubeClient.Wait(totalItems, 60*time.Second); err != nil {
cfg.Log("Error waiting for items: %s", err)
if err := r.config.KubeClient.Wait(totalItems, 60*time.Second); err != nil {
r.config.Log("Error waiting for items: %s", err)
return err
}
// Make sure to force a rebuild of the cache.
Expand Down Expand Up @@ -402,3 +410,45 @@ func wrapActionErr(log *LogBuffer, err error) error {
}
return err
}

func setOriginVisitor(namespace, name string) resource.VisitorFunc {
return func(info *resource.Info, err error) error {
if err != nil {
return err
}
if err = mergeLabels(info.Object, originLabels(namespace, name)); err != nil {
return fmt.Errorf(
"%s origin labels could not be updated: %s",
resourceString(info), err,
)
}
return nil
}
}

func mergeLabels(obj runtime.Object, labels map[string]string) error {
current, err := accessor.Labels(obj)
if err != nil {
return err
}
return accessor.SetLabels(obj, mergeStrStrMaps(current, labels))
}

func resourceString(info *resource.Info) string {
_, k := info.Mapping.GroupVersionKind.ToAPIVersionAndKind()
return fmt.Sprintf(
"%s %q in namespace %q",
k, info.Name, info.Namespace,
)
}

func mergeStrStrMaps(current, desired map[string]string) map[string]string {
result := make(map[string]string)
for k, v := range current {
result[k] = v
}
for k, desiredVal := range desired {
result[k] = desiredVal
}
return result
}

0 comments on commit c76c2c5

Please sign in to comment.