diff --git a/chart/flux/templates/helm-operator-crd.yaml b/chart/flux/templates/helm-operator-crd.yaml index e6bfdd6b1..999062650 100644 --- a/chart/flux/templates/helm-operator-crd.yaml +++ b/chart/flux/templates/helm-operator-crd.yaml @@ -21,6 +21,8 @@ spec: shortNames: - hr scope: Namespaced + subresources: + status: {} version: v1beta1 versions: - name: v1beta1 diff --git a/cmd/helm-operator/main.go b/cmd/helm-operator/main.go index c63c05231..3b76a4e8c 100644 --- a/cmd/helm-operator/main.go +++ b/cmd/helm-operator/main.go @@ -13,6 +13,7 @@ import ( "github.com/spf13/pflag" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/workqueue" "github.com/weaveworks/flux/checkpoint" clientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" @@ -136,19 +137,19 @@ func main() { cfg, err := clientcmd.BuildConfigFromFlags(*master, *kubeconfig) if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error building kubeconfig: %v", err)) + mainLogger.Log("error", fmt.Sprintf("error building kubeconfig: %v", err)) os.Exit(1) } kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error building kubernetes clientset: %v", err)) + mainLogger.Log("error", fmt.Sprintf("error building kubernetes clientset: %v", err)) os.Exit(1) } ifClient, err := clientset.NewForConfig(cfg) if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error building integrations clientset: %v", err)) + mainLogger.Log("error", fmt.Sprintf("error building integrations clientset: %v", err)) os.Exit(1) } @@ -169,27 +170,27 @@ func main() { statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace) go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator")) - // release instance is needed during the sync of Charts changes and during the sync of HelmRelease changes + nsOpt := ifinformers.WithNamespace(*namespace) + ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, *chartsSyncInterval, nsOpt) + fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases() + go ifInformerFactory.Start(shutdown) + + queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease") + + // release instance is needed during the sync of git chart changes and during the sync of HelmRelease changes rel := release.New(log.With(logger, "component", "release"), helmClient) chartSync := chartsync.New( log.With(logger, "component", "chartsync"), - chartsync.Polling{Interval: *chartsSyncInterval}, - chartsync.Clients{KubeClient: *kubeClient, IfClient: *ifClient}, + chartsync.Clients{KubeClient: *kubeClient, IfClient: *ifClient, FhrLister: fhrInformer.Lister()}, rel, + queue, chartsync.Config{LogDiffs: *logReleaseDiffs, UpdateDeps: *updateDependencies, GitTimeout: *gitTimeout, GitPollInterval: *gitPollInterval}, *namespace, - statusUpdater, ) chartSync.Run(shutdown, errc, shutdownWg) - nsOpt := ifinformers.WithNamespace(*namespace) - ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, 30*time.Second, nsOpt) - fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases() - // start FluxRelease informer - opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, chartSync) - go ifInformerFactory.Start(shutdown) - + opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, queue, chartSync) checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint")) // start HTTP server diff --git a/deploy-helm/flux-helm-release-crd.yaml b/deploy-helm/flux-helm-release-crd.yaml index 1a633bf3c..d9bf9a81f 100644 --- a/deploy-helm/flux-helm-release-crd.yaml +++ b/deploy-helm/flux-helm-release-crd.yaml @@ -12,6 +12,8 @@ spec: shortNames: - hr scope: Namespaced + subresources: + status: {} version: v1beta1 versions: - name: v1beta1 diff --git a/integrations/apis/flux.weave.works/v1beta1/types.go b/integrations/apis/flux.weave.works/v1beta1/types.go index 4dea9fcf1..d38fbf899 100644 --- a/integrations/apis/flux.weave.works/v1beta1/types.go +++ b/integrations/apis/flux.weave.works/v1beta1/types.go @@ -12,7 +12,6 @@ import ( ) // +genclient -// +genclient:noStatus // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // FluxHelmRelease represents custom resource associated with a Helm Chart diff --git a/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/fake/fake_helmrelease.go b/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/fake/fake_helmrelease.go index 8c4c23fc1..130923d9d 100644 --- a/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/fake/fake_helmrelease.go +++ b/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/fake/fake_helmrelease.go @@ -100,6 +100,18 @@ func (c *FakeHelmReleases) Update(helmRelease *v1beta1.HelmRelease) (result *v1b return obj.(*v1beta1.HelmRelease), err } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeHelmReleases) UpdateStatus(helmRelease *v1beta1.HelmRelease) (*v1beta1.HelmRelease, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(helmreleasesResource, "status", c.ns, helmRelease), &v1beta1.HelmRelease{}) + + if obj == nil { + return nil, err + } + return obj.(*v1beta1.HelmRelease), err +} + // Delete takes name of the helmRelease and deletes it. Returns an error if one occurs. func (c *FakeHelmReleases) Delete(name string, options *v1.DeleteOptions) error { _, err := c.Fake. diff --git a/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/helmrelease.go b/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/helmrelease.go index 7f0f7f717..bd81cd27e 100644 --- a/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/helmrelease.go +++ b/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1/helmrelease.go @@ -39,6 +39,7 @@ type HelmReleasesGetter interface { type HelmReleaseInterface interface { Create(*v1beta1.HelmRelease) (*v1beta1.HelmRelease, error) Update(*v1beta1.HelmRelease) (*v1beta1.HelmRelease, error) + UpdateStatus(*v1beta1.HelmRelease) (*v1beta1.HelmRelease, error) Delete(name string, options *v1.DeleteOptions) error DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error Get(name string, options v1.GetOptions) (*v1beta1.HelmRelease, error) @@ -132,6 +133,22 @@ func (c *helmReleases) Update(helmRelease *v1beta1.HelmRelease) (result *v1beta1 return } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *helmReleases) UpdateStatus(helmRelease *v1beta1.HelmRelease) (result *v1beta1.HelmRelease, err error) { + result = &v1beta1.HelmRelease{} + err = c.client.Put(). + Namespace(c.ns). + Resource("helmreleases"). + Name(helmRelease.Name). + SubResource("status"). + Body(helmRelease). + Do(). + Into(result) + return +} + // Delete takes name of the helmRelease and deletes it. Returns an error if one occurs. func (c *helmReleases) Delete(name string, options *v1.DeleteOptions) error { return c.client.Delete(). diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index f437e1e9d..681f578d5 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -8,19 +8,17 @@ reconciled: 1a. There is a HelmRelease resource, but no corresponding release. This can happen when the helm operator is first run, for - example. The ChartChangeSync periodically checks for this by - running through the resources and installing any that aren't - released already. + example. 1b. The release corresponding to a HelmRelease has been updated by some other means, perhaps while the operator wasn't running. This - is also checked periodically, by doing a dry-run release and - comparing the result to the release. + is also checked, by doing a dry-run release and comparing the result + to the release. 2. The chart has changed in git, meaning the release is out of - date. The ChartChangeSync responds to new git commits by looking at - each chart that's referenced by a HelmRelease, and if it's - changed since the last seen commit, updating the release. + date. The ChartChangeSync responds to new git commits by looking up + each chart that makes use of the mirror that has new commits, + replacing the clone for that chart, and scheduling a new release. 1a.) and 1b.) run on the same schedule, and 2.) is run when a git mirror reports it has fetched from upstream _and_ (upon checking) the @@ -44,6 +42,8 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/labels" + "github.com/go-kit/kit/log" google_protobuf "github.com/golang/protobuf/ptypes/any" "github.com/google/go-cmp/cmp" @@ -52,12 +52,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" hapi_chart "k8s.io/helm/pkg/proto/hapi/chart" hapi_release "k8s.io/helm/pkg/proto/hapi/release" "github.com/weaveworks/flux/git" + "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1" fluxv1beta1 "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1" ifclientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" + iflister "github.com/weaveworks/flux/integrations/client/listers/flux.weave.works/v1beta1" helmop "github.com/weaveworks/flux/integrations/helm" "github.com/weaveworks/flux/integrations/helm/release" "github.com/weaveworks/flux/integrations/helm/status" @@ -75,13 +78,10 @@ const ( ReasonSuccess = "HelmSuccess" ) -type Polling struct { - Interval time.Duration -} - type Clients struct { KubeClient kubernetes.Clientset IfClient ifclientset.Clientset + FhrLister iflister.HelmReleaseLister } type Config struct { @@ -108,13 +108,19 @@ type clone struct { head string } +// ReleaseQueue is an add-only workqueue.RateLimitingInterface +type ReleaseQueue interface { + AddRateLimited(item interface{}) +} + type ChartChangeSync struct { - Polling - logger log.Logger - kubeClient kubernetes.Clientset - ifClient ifclientset.Clientset - release *release.Release - config Config + logger log.Logger + kubeClient kubernetes.Clientset + ifClient ifclientset.Clientset + fhrLister iflister.HelmReleaseLister + release *release.Release + releaseQueue ReleaseQueue + config Config mirrors *git.Mirrors @@ -124,17 +130,18 @@ type ChartChangeSync struct { namespace string } -func New(logger log.Logger, polling Polling, clients Clients, release *release.Release, config Config, namespace string, statusUpdater *status.Updater) *ChartChangeSync { +func New(logger log.Logger, clients Clients, release *release.Release, releaseQueue ReleaseQueue, config Config, namespace string) *ChartChangeSync { return &ChartChangeSync{ - logger: logger, - Polling: polling, - kubeClient: clients.KubeClient, - ifClient: clients.IfClient, - release: release, - config: config.WithDefaults(), - mirrors: git.NewMirrors(), - clones: make(map[string]clone), - namespace: namespace, + logger: logger, + kubeClient: clients.KubeClient, + ifClient: clients.IfClient, + fhrLister: clients.FhrLister, + release: release, + releaseQueue: releaseQueue, + config: config.WithDefaults(), + mirrors: git.NewMirrors(), + clones: make(map[string]clone), + namespace: namespace, } } @@ -142,7 +149,8 @@ func New(logger log.Logger, polling Polling, clients Clients, release *release.R // Helm releases in the cluster, what HelmRelease declare, and // changes in the git repos mentioned by any HelmRelease. func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *sync.WaitGroup) { - chs.logger.Log("info", "Starting charts sync loop") + chs.logger.Log("info", "starting git chart sync loop") + wg.Add(1) go func() { defer runtime.HandleCrash() @@ -151,109 +159,101 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn wg.Done() }() - ticker := time.NewTicker(chs.Polling.Interval) - defer ticker.Stop() - for { select { - case reposChanged := <-chs.mirrors.Changes(): - // TODO(michael): the inefficient way, for now, until - // it's clear how to better optimalise it - resources, err := chs.getCustomResources() - if err != nil { - chs.logger.Log("warning", "failed to get custom resources", "err", err) - continue - } - for _, fhr := range resources { - if fhr.Spec.ChartSource.GitChartSource == nil { - continue - } - - repoURL := fhr.Spec.ChartSource.GitChartSource.GitURL - repoName := mirrorName(fhr.Spec.ChartSource.GitChartSource) - - if _, ok := reposChanged[repoName]; !ok { + case mirrorsChanged := <-chs.mirrors.Changes(): + for mirror := range mirrorsChanged { + resources, err := chs.getCustomResourcesForMirror(mirror) + if err != nil { + chs.logger.Log("warning", "failed to get custom resources", "err", err) continue } - repo, ok := chs.mirrors.Get(repoName) + // Retrieve the mirror we got a change signal for + repo, ok := chs.mirrors.Get(mirror) if !ok { // Then why .. did you say .. it had changed? It may have been removed. Add it back and let it signal again. - chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git mirror missing; starting mirroring again") - chs.logger.Log("warning", "mirrored git repo disappeared after signalling change", "repo", repoName) - chs.maybeMirror(fhr) + chs.logger.Log("warning", "mirrored git repo disappeared after signalling change", "repo", mirror) + for _, fhr := range resources { + chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git mirror missing; starting mirroring again") + chs.maybeMirror(fhr) + } continue } + // Ensure the repo is ready status, err := repo.Status() if status != git.RepoReady { - chs.logger.Log("info", "repo not ready yet, while attempting chart sync", "repo", repoURL, "status", string(status)) - // TODO(michael) log if there's a problem with the following? - chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, err.Error()) - continue - } - - ref := fhr.Spec.ChartSource.GitChartSource.RefOrDefault() - path := fhr.Spec.ChartSource.GitChartSource.Path - releaseName := release.GetReleaseName(fhr) - - ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) - refHead, err := repo.Revision(ctx, ref) - cancel() - if err != nil { - chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", repoURL, "ref", ref, "err", err) + chs.logger.Log("info", "repo not ready yet, while attempting chart sync", "repo", mirror, "status", string(status)) + for _, fhr := range resources { + // TODO(michael) log if there's a problem with the following? + chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, err.Error()) + } continue } - // This FHR is using a git repo; and, it appears to have had commits since we last saw it. - // Check explicitly whether we should update its clone. - chs.clonesMu.Lock() - cloneForChart, ok := chs.clones[releaseName] - chs.clonesMu.Unlock() + // Determine if we need to update the clone and + // schedule an upgrade for every HelmRelease that + // makes use of the mirror + for _, fhr := range resources { + ref := fhr.Spec.ChartSource.GitChartSource.RefOrDefault() + path := fhr.Spec.ChartSource.GitChartSource.Path + releaseName := release.GetReleaseName(fhr) - if ok { // found clone ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) - commits, err := repo.CommitsBetween(ctx, cloneForChart.head, refHead, path) + refHead, err := repo.Revision(ctx, ref) cancel() if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", repoURL, "ref", ref, "err", err) + chs.logger.Log("warning", "could not get revision for ref while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err) continue } - ok = len(commits) == 0 - } - if !ok { // didn't find clone, or it needs updating - ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) - newClone, err := repo.Export(ctx, refHead) - cancel() - if err != nil { - chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not clone from mirror while checking for changes", "repo", repoURL, "ref", ref, "err", err) - continue - } - newCloneForChart := clone{remote: repoURL, ref: ref, head: refHead, export: newClone} + // The git repo of this appears to have had commits since we last saw it, + // check explicitly whether we should update its clone. chs.clonesMu.Lock() - chs.clones[releaseName] = newCloneForChart + cloneForChart, ok := chs.clones[releaseName] chs.clonesMu.Unlock() - if cloneForChart.export != nil { - cloneForChart.export.Clean() + + if ok { // found clone + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + commits, err := repo.CommitsBetween(ctx, cloneForChart.head, refHead, path) + cancel() + if err != nil { + chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) + chs.logger.Log("warning", "could not get revision for ref while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err) + continue + } + ok = len(commits) == 0 } - } - chs.reconcileReleaseDef(fhr) - } - case <-ticker.C: - // Re-release any chart releases that have apparently - // changed in the cluster. - chs.logger.Log("info", fmt.Sprint("Start of releasesync")) - err := chs.reapplyReleaseDefs() - if err != nil { - chs.logger.Log("error", fmt.Sprintf("Failure to do manual release sync: %s", err)) - } - chs.logger.Log("info", fmt.Sprint("End of releasesync")) + if !ok { // didn't find clone, or it needs updating + ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + newClone, err := repo.Export(ctx, refHead) + cancel() + if err != nil { + chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) + chs.logger.Log("warning", "could not clone from mirror while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err) + continue + } + newCloneForChart := clone{remote: mirror, ref: ref, head: refHead, export: newClone} + chs.clonesMu.Lock() + chs.clones[releaseName] = newCloneForChart + chs.clonesMu.Unlock() + if cloneForChart.export != nil { + cloneForChart.export.Clean() + } + } + // Enqueue release + cacheKey, err := cache.MetaNamespaceKeyFunc(fhr.GetObjectMeta()) + if err != nil { + continue + } + chs.logger.Log("info", "enqueing release upgrade due to change in git chart source", "resource", fhr.ResourceID().String()) + chs.releaseQueue.AddRateLimited(cacheKey) + } + } case <-stopCh: chs.logger.Log("stopping", "true") return @@ -315,7 +315,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { // is being referenced in the chart source. if ok { ok = chartClone.remote == chartSource.GitURL && chartClone.ref == chartSource.RefOrDefault() - if !ok { + if !ok { if chartClone.export != nil { chartClone.export.Clean() } @@ -332,12 +332,12 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { if !ok { chs.maybeMirror(fhr) chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo "+chartSource.GitURL+" not mirrored yet") - chs.logger.Log("info", "chart repo not cloned yet", "releaseName", releaseName, "resource", fmt.Sprintf("%s:%s/%s", fhr.Namespace, fhr.Kind, fhr.Name)) + chs.logger.Log("info", "chart repo not cloned yet", "resource", fhr.ResourceID().String()) } else { status, err := repo.Status() if status != git.RepoReady { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo not mirrored yet: "+err.Error()) - chs.logger.Log("info", "chart repo not ready yet", "releaseName", releaseName, "resource", fmt.Sprintf("%s:%s/%s", fhr.Namespace, fhr.Kind, fhr.Name), "status", string(status), "err", err) + chs.logger.Log("info", "chart repo not ready yet", "resource", fhr.ResourceID().String(), "status", string(status), "err", err) } } return @@ -349,7 +349,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { if chs.config.UpdateDeps && !fhr.Spec.ChartSource.GitChartSource.SkipDepUpdate { if err := updateDependencies(chartPath, ""); err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonDependencyFailed, err.Error()) - chs.logger.Log("warning", "Failed to update chart dependencies", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to update chart dependencies", "resource", fhr.ResourceID().String(), "err", err) return } } @@ -358,7 +358,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { path, err := ensureChartFetched(chs.config.ChartCache, chartSource) if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonDownloadFailed, "chart download failed: "+err.Error()) - chs.logger.Log("info", "chart download failed", "releaseName", releaseName, "resource", fhr.ResourceID().String(), "err", err) + chs.logger.Log("info", "chart download failed", "resource", fhr.ResourceID().String(), "err", err) return } chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionTrue, ReasonDownloaded, "chart fetched: "+filepath.Base(path)) @@ -370,7 +370,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { _, err := chs.release.Install(chartPath, releaseName, fhr, release.InstallAction, opts, &chs.kubeClient) if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonInstallFailed, err.Error()) - chs.logger.Log("warning", "Failed to install chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to install chart", "resource", fhr.ResourceID().String(), "err", err) return } chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm install succeeded") @@ -382,40 +382,33 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { changed, err := chs.shouldUpgrade(chartPath, rel, fhr) if err != nil { - chs.logger.Log("warning", "Unable to determine if release has changed", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "unable to determine if release has changed", "resource", fhr.ResourceID().String(), "err", err) return } if changed { - _, err := chs.release.Install(chartPath, releaseName, fhr, release.UpgradeAction, opts, &chs.kubeClient) + cFhr, err := chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace).Get(fhr.Name, metav1.GetOptions{}) + if err != nil { + chs.logger.Log("warning", "failed to retrieve HelmRelease scheduled for upgrade", "resource", fhr.ResourceID().String(), "err", err) + return + } + if diff := cmp.Diff(fhr.Spec, cFhr.Spec); diff != "" { + chs.logger.Log("warning", "HelmRelease spec has diverged since we calculated if we should upgrade, skipping upgrade", "resource", fhr.ResourceID().String()) + return + } + _, err = chs.release.Install(chartPath, releaseName, fhr, release.UpgradeAction, opts, &chs.kubeClient) if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonUpgradeFailed, err.Error()) - chs.logger.Log("warning", "Failed to upgrade chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to upgrade chart", "resource", fhr.ResourceID().String(), "err", err) return } chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm upgrade succeeded") if err = status.UpdateReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil { - chs.logger.Log("warning", "could not update the release revision", "namespace", fhr.Namespace, "resource", fhr.Name, "err", err) + chs.logger.Log("warning", "could not update the release revision", "resource", fhr.ResourceID().String(), "err", err) } return } } -// reapplyReleaseDefs goes through the resource definitions and -// reconciles them with Helm releases. This is a "backstop" for the -// other sync processes, to cover the case of a release being changed -// out-of-band (e.g., by someone using `helm upgrade`). -func (chs *ChartChangeSync) reapplyReleaseDefs() error { - resources, err := chs.getCustomResources() - if err != nil { - return fmt.Errorf("failed to get HelmRelease resources from the API server: %s", err.Error()) - } - - for _, fhr := range resources { - chs.reconcileReleaseDef(fhr) - } - return nil -} - // DeleteRelease deletes the helm release associated with a // HelmRelease. This exists mainly so that the operator code can // call it when it is handling a resource deletion. @@ -424,7 +417,7 @@ func (chs *ChartChangeSync) DeleteRelease(fhr fluxv1beta1.HelmRelease) { name := release.GetReleaseName(fhr) err := chs.release.Delete(name) if err != nil { - chs.logger.Log("warning", "Chart release not deleted", "release", name, "error", err) + chs.logger.Log("warning", "chart release not deleted", "resource", fhr.ResourceID().String(), "release", name, "err", err) } // Remove the clone we may have for this HelmRelease @@ -441,39 +434,27 @@ func (chs *ChartChangeSync) DeleteRelease(fhr fluxv1beta1.HelmRelease) { // SyncMirrors instructs all mirrors to refresh from their upstream. func (chs *ChartChangeSync) SyncMirrors() { - chs.logger.Log("info", "Starting mirror sync") + chs.logger.Log("info", "starting mirror sync") for _, err := range chs.mirrors.RefreshAll(chs.config.GitTimeout) { - chs.logger.Log("error", fmt.Sprintf("Failure while syncing mirror: %s", err)) + chs.logger.Log("error", fmt.Sprintf("failure while syncing mirror: %s", err)) } - chs.logger.Log("info", "Finished syncing mirrors") + chs.logger.Log("info", "finished syncing mirrors") } -// getCustomResources assembles all custom resources in all namespaces -// or in the allowed namespace if specified -func (chs *ChartChangeSync) getCustomResources() ([]fluxv1beta1.HelmRelease, error) { - var namespaces []string - if chs.namespace != "" { - namespaces = append(namespaces, chs.namespace) - } else { - nso, err := chs.kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{}) - if err != nil { - return nil, fmt.Errorf("Failure while retrieving kubernetes namespaces: %s", err) - } - for _, n := range nso.Items { - namespaces = append(namespaces, n.GetName()) - } +// getCustomResourcesForMirror retrieves all the resources that make +// use of the given mirror from the lister. +func (chs *ChartChangeSync) getCustomResourcesForMirror(mirror string) ([]fluxv1beta1.HelmRelease, error) { + var fhrs []v1beta1.HelmRelease + list, err := chs.fhrLister.List(labels.Everything()) + if err != nil { + return nil, err } - var fhrs []fluxv1beta1.HelmRelease - for _, ns := range namespaces { - list, err := chs.ifClient.FluxV1beta1().HelmReleases(ns).List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - - for _, fhr := range list.Items { - fhrs = append(fhrs, fhr) + for _, fhr := range list { + if mirror != mirrorName(fhr.Spec.GitChartSource) { + continue } + fhrs = append(fhrs, *fhr) } return fhrs, nil } @@ -547,7 +528,7 @@ func sortChartFields(c *hapi_chart.Chart) *hapi_chart.Chart { // doing a dry run install from the chart in the git repo. func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_release.Release, fhr fluxv1beta1.HelmRelease) (bool, error) { if currRel == nil { - return false, fmt.Errorf("No Chart release provided for %v", fhr.GetName()) + return false, fmt.Errorf("no chart release provided for %v", fhr.GetName()) } currVals := currRel.GetConfig() @@ -566,18 +547,18 @@ func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_relea // compare values && Chart if diff := cmp.Diff(currVals, desVals); diff != "" { if chs.config.LogDiffs { - chs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName()), "diff", diff) + chs.logger.Log("error", fmt.Sprintf("release %s: values have diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String(), "diff", diff) } else { - chs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName())) + chs.logger.Log("error", fmt.Sprintf("release %s: values have diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String()) } return true, nil } if diff := cmp.Diff(sortChartFields(currChart), sortChartFields(desChart)); diff != "" { if chs.config.LogDiffs { - chs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName()), "diff", diff) + chs.logger.Log("error", fmt.Sprintf("release %s: chart has diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String(), "diff", diff) } else { - chs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName())) + chs.logger.Log("error", fmt.Sprintf("release %s: chart has diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String()) } return true, nil } diff --git a/integrations/helm/helm.go b/integrations/helm/helm.go index afd8e4403..f803ee8c3 100644 --- a/integrations/helm/helm.go +++ b/integrations/helm/helm.go @@ -75,7 +75,7 @@ func ClientSetup(logger log.Logger, kubeClient *kubernetes.Clientset, tillerOpts for { helmClient, host, err = newClient(kubeClient, tillerOpts) if err != nil { - logger.Log("error", fmt.Sprintf("Error creating helm client: %s", err.Error())) + logger.Log("error", fmt.Sprintf("error creating helm client: %s", err.Error())) time.Sleep(20 * time.Second) continue } diff --git a/integrations/helm/http/daemon/server.go b/integrations/helm/http/daemon/server.go index f254d9981..d7a1cedc7 100644 --- a/integrations/helm/http/daemon/server.go +++ b/integrations/helm/http/daemon/server.go @@ -3,14 +3,15 @@ package daemon import ( "context" "fmt" + "net/http" + "sync/atomic" + "time" + "github.com/go-kit/kit/log" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/weaveworks/flux/integrations/helm/api" transport "github.com/weaveworks/flux/integrations/helm/http" - "net/http" - "sync/atomic" - "time" ) // ListenAndServe starts a HTTP server instrumented with Prometheus metrics, @@ -37,7 +38,7 @@ func ListenAndServe(listenAddr string, apiServer api.Server, logger log.Logger, IdleTimeout: 15 * time.Second, } - logger.Log("info", fmt.Sprintf("Starting HTTP server on %s", listenAddr)) + logger.Log("info", fmt.Sprintf("starting HTTP server on %s", listenAddr)) // run server in background go func() { diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index 22f1cdddb..30761ab67 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -7,7 +7,6 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/golang/glog" "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -40,12 +39,9 @@ const ( // a HelmRelease fails to be released/updated ErrChartSync = "ErrChartSync" - // MessageChartSynced - the message used for Events when a resource - // fails to sync due to failing to release the Chart - MessageChartSynced = "Chart managed by HelmRelease processed successfully" - // MessageErrChartSync - the message used for an Event fired when a HelmRelease - // is synced successfully - MessageErrChartSync = "Chart %s managed by HelmRelease failed to be processed" + // MessageChartSynced - the message used for an Event fired when a HelmRelease + // is synced. + MessageChartSynced = "Chart managed by HelmRelease processed" ) // Controller is the operator implementation for HelmRelease resources @@ -76,13 +72,13 @@ func New( logReleaseDiffs bool, kubeclientset kubernetes.Interface, fhrInformer fhrv1.HelmReleaseInformer, + releaseWorkqueue workqueue.RateLimitingInterface, sync *chartsync.ChartChangeSync) *Controller { // Add helm-operator types to the default Kubernetes Scheme so Events can be // logged for helm-operator types. ifscheme.AddToScheme(scheme.Scheme) eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) @@ -91,18 +87,16 @@ func New( logDiffs: logReleaseDiffs, fhrLister: fhrInformer.Lister(), fhrSynced: fhrInformer.Informer().HasSynced, - releaseWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease"), + releaseWorkqueue: releaseWorkqueue, recorder: recorder, sync: sync, } - controller.logger.Log("info", "Setting up event handlers") + controller.logger.Log("info", "setting up event handlers") // ----- EVENT HANDLERS for HelmRelease resources change --------- fhrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(new interface{}) { - controller.logger.Log("info", "CREATING release") - controller.logger.Log("info", "Custom Resource driven release install") _, ok := checkCustomResourceType(controller.logger, new) if ok { controller.enqueueJob(new) @@ -118,7 +112,7 @@ func New( } }, }) - controller.logger.Log("info", "Event handlers set up") + controller.logger.Log("info", "event handlers set up") return controller } @@ -131,16 +125,16 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG defer runtime.HandleCrash() defer c.releaseWorkqueue.ShutDown() - c.logger.Log("info", "Starting operator") + c.logger.Log("info", "starting operator") // Wait for the caches to be synced before starting workers - c.logger.Log("info", "Waiting for informer caches to sync") + c.logger.Log("info", "waiting for informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, c.fhrSynced); !ok { return errors.New("failed to wait for caches to sync") } - c.logger.Log("info", "Informer caches synced") + c.logger.Log("info", "unformer caches synced") - c.logger.Log("info", "Starting workers") + c.logger.Log("info", "starting workers") for i := 0; i < threadiness; i++ { wg.Add(1) go wait.Until(c.runWorker, time.Second, stopCh) @@ -150,7 +144,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG for i := 0; i < threadiness; i++ { wg.Done() } - c.logger.Log("info", "Stopping workers") + c.logger.Log("info", "stopping workers") return nil } @@ -166,11 +160,7 @@ func (c *Controller) runWorker() { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func (c *Controller) processNextWorkItem() bool { - c.logger.Log("debug", "Processing next work queue job ...") - obj, shutdown := c.releaseWorkqueue.Get() - c.logger.Log("debug", fmt.Sprintf("PROCESSING item [%#v]", obj)) - if shutdown { return false } @@ -197,7 +187,7 @@ func (c *Controller) processNextWorkItem() bool { // Forget not to get into a loop of attempting to // process a work item that is invalid. c.releaseWorkqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("Expected string in workqueue but got %#v", obj)) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } @@ -205,14 +195,11 @@ func (c *Controller) processNextWorkItem() bool { // HelmRelease resource to sync the corresponding Chart release. // If the sync failed, then we return while the item will get requeued if err := c.syncHandler(key); err != nil { - return fmt.Errorf("error syncing '%s': %s", key, err.Error()) + return fmt.Errorf("errored syncing HelmRelease '%s': %s", key, err.Error()) } // If no error occurs we Forget this item so it does not // get queued again until another change happens. c.releaseWorkqueue.Forget(obj) - - c.logger.Log("info", fmt.Sprintf("Successfully synced '%s'", key)) - return nil }(obj) @@ -226,13 +213,11 @@ func (c *Controller) processNextWorkItem() bool { // syncHandler acts according to the action // Deletes/creates or updates a Chart release func (c *Controller) syncHandler(key string) error { - c.logger.Log("debug", fmt.Sprintf("Starting to sync cache key %s", key)) - // Retrieve namespace and Custom Resource name from the key namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - c.logger.Log("info", fmt.Sprintf("Invalid cache key: %v", err)) - runtime.HandleError(fmt.Errorf("Invalid cache key: %s", key)) + c.logger.Log("error", fmt.Sprintf("key '%s' is invalid: %v", key, err)) + runtime.HandleError(fmt.Errorf("key '%s' is invalid", key)) return nil } @@ -297,19 +282,23 @@ func (c *Controller) enqueueUpdateJob(old, new interface{}) { return } - if diff := cmp.Diff(oldFhr.Spec, newFhr.Spec); diff != "" { - c.logger.Log("info", "UPGRADING release") - if c.logDiffs { - c.logger.Log("info", "Custom Resource driven release upgrade", "diff", diff) - } else { - c.logger.Log("info", "Custom Resource driven release upgrade") - } - c.enqueueJob(new) + log := []string{"info", "enqueuing release upgrade"} + if diff := cmp.Diff(oldFhr.Spec, newFhr.Spec); diff != "" && c.logDiffs { + log = append(log, "diff", diff) + } + log = append(log, "resource", newFhr.ResourceID().String()) + + l := make([]interface{}, len(log)) + for i, v := range log { + l[i] = v } + + c.logger.Log(l...) + + c.enqueueJob(new) } func (c *Controller) deleteRelease(fhr flux_v1beta1.HelmRelease) { - c.logger.Log("info", "DELETING release") - c.logger.Log("info", "Custom Resource driven release deletion") + c.logger.Log("info", "deleting release", "resource", fhr.ResourceID().String()) c.sync.DeleteRelease(fhr) } diff --git a/integrations/helm/status/conditions.go b/integrations/helm/status/conditions.go index 0d154b588..6715675ed 100644 --- a/integrations/helm/status/conditions.go +++ b/integrations/helm/status/conditions.go @@ -1,17 +1,14 @@ package status import ( - "encoding/json" - "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1" v1beta1client "github.com/weaveworks/flux/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1" - "k8s.io/apimachinery/pkg/types" ) // We can't rely on having UpdateStatus, or strategic merge patching // for custom resources. So we have to create an object which // represents the merge path or JSON patch to apply. -func UpdateConditionsPatch(status *v1beta1.HelmReleaseStatus, updates ...v1beta1.HelmReleaseCondition) (types.PatchType, interface{}) { +func UpdateConditionsPatch(status *v1beta1.HelmReleaseStatus, updates ...v1beta1.HelmReleaseCondition) { newConditions := make([]v1beta1.HelmReleaseCondition, len(status.Conditions)) oldConditions := status.Conditions for i, c := range oldConditions { @@ -28,20 +25,15 @@ updates: newConditions = append(newConditions, up) } status.Conditions = newConditions - return types.MergePatchType, map[string]interface{}{ - "status": map[string]interface{}{ - "conditions": newConditions, - }, - } } -// UpdateConditions applies the updates to the HelmRelease given, and patches the resource in the cluster. +// UpdateConditions applies the updates to the HelmRelease given, and +// updates the resource in the cluster. func UpdateConditions(client v1beta1client.HelmReleaseInterface, fhr *v1beta1.HelmRelease, updates ...v1beta1.HelmReleaseCondition) error { - t, obj := UpdateConditionsPatch(&fhr.Status, updates...) - bytes, err := json.Marshal(obj) - if err != nil { - return err - } - _, err = client.Patch(fhr.Name, t, bytes) + fhrCopy := fhr.DeepCopy() + + UpdateConditionsPatch(&fhrCopy.Status, updates...) + _, err := client.UpdateStatus(fhrCopy) + return err } diff --git a/integrations/helm/status/status.go b/integrations/helm/status/status.go index 17b6151f3..d21a49b08 100644 --- a/integrations/helm/status/status.go +++ b/integrations/helm/status/status.go @@ -14,12 +14,10 @@ correspond. Specifically, package status import ( - "encoding/json" "time" "github.com/go-kit/kit/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" kube "k8s.io/client-go/kubernetes" "k8s.io/helm/pkg/helm" @@ -104,34 +102,18 @@ bail: } func UpdateReleaseStatus(client v1beta1client.HelmReleaseInterface, fhr v1beta1.HelmRelease, releaseName, releaseStatus string) error { - patchBytes, err := json.Marshal(map[string]interface{}{ - "status": map[string]interface{}{ - "releaseName": releaseName, - "releaseStatus": releaseStatus, - }, - }) - if err == nil { - // CustomResources don't get - // StrategicMergePatch, for now, but since we - // want to unconditionally set the value, this - // is OK. - _, err = client.Patch(fhr.Name, types.MergePatchType, patchBytes) - } + fhr.Status.ReleaseName = releaseName + fhr.Status.ReleaseStatus = releaseStatus + + _, err := client.UpdateStatus(&fhr) + return err } func UpdateReleaseRevision(client v1beta1client.HelmReleaseInterface, fhr v1beta1.HelmRelease, revision string) error { - patchBytes, err := json.Marshal(map[string]interface{}{ - "status": map[string]interface{}{ - "revision": revision, - }, - }) - if err == nil { - // CustomResources don't get - // StrategicMergePatch, for now, but since we - // want to unconditionally set the value, this - // is OK. - _, err = client.Patch(fhr.Name, types.MergePatchType, patchBytes) - } + fhr.Status.Revision = revision + + _, err := client.UpdateStatus(&fhr) + return err }