diff --git a/cmd/helm-operator/main.go b/cmd/helm-operator/main.go index 270ede645..3b76a4e8c 100644 --- a/cmd/helm-operator/main.go +++ b/cmd/helm-operator/main.go @@ -137,19 +137,19 @@ func main() { cfg, err := clientcmd.BuildConfigFromFlags(*master, *kubeconfig) if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error building kubeconfig: %v", err)) + mainLogger.Log("error", fmt.Sprintf("error building kubeconfig: %v", err)) os.Exit(1) } kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error building kubernetes clientset: %v", err)) + mainLogger.Log("error", fmt.Sprintf("error building kubernetes clientset: %v", err)) os.Exit(1) } ifClient, err := clientset.NewForConfig(cfg) if err != nil { - mainLogger.Log("error", fmt.Sprintf("Error building integrations clientset: %v", err)) + mainLogger.Log("error", fmt.Sprintf("error building integrations clientset: %v", err)) os.Exit(1) } diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index a6cbe72e2..681f578d5 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -149,7 +149,7 @@ func New(logger log.Logger, clients Clients, release *release.Release, releaseQu // Helm releases in the cluster, what HelmRelease declare, and // changes in the git repos mentioned by any HelmRelease. func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *sync.WaitGroup) { - chs.logger.Log("info", "Starting git chart sync loop") + chs.logger.Log("info", "starting git chart sync loop") wg.Add(1) go func() { @@ -169,27 +169,32 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn continue } + // Retrieve the mirror we got a change signal for repo, ok := chs.mirrors.Get(mirror) if !ok { + // Then why .. did you say .. it had changed? It may have been removed. Add it back and let it signal again. + chs.logger.Log("warning", "mirrored git repo disappeared after signalling change", "repo", mirror) for _, fhr := range resources { - // Then why .. did you say .. it had changed? It may have been removed. Add it back and let it signal again. chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git mirror missing; starting mirroring again") - chs.logger.Log("warning", "mirrored git repo disappeared after signalling change", "repo", mirror) chs.maybeMirror(fhr) } continue } + // Ensure the repo is ready status, err := repo.Status() if status != git.RepoReady { + chs.logger.Log("info", "repo not ready yet, while attempting chart sync", "repo", mirror, "status", string(status)) for _, fhr := range resources { - chs.logger.Log("info", "repo not ready yet, while attempting chart sync", "repo", mirror, "status", string(status)) // TODO(michael) log if there's a problem with the following? chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, err.Error()) } continue } + // Determine if we need to update the clone and + // schedule an upgrade for every HelmRelease that + // makes use of the mirror for _, fhr := range resources { ref := fhr.Spec.ChartSource.GitChartSource.RefOrDefault() path := fhr.Spec.ChartSource.GitChartSource.Path @@ -200,7 +205,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn cancel() if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", mirror, "ref", ref, "err", err) + chs.logger.Log("warning", "could not get revision for ref while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err) continue } @@ -216,7 +221,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn cancel() if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", mirror, "ref", ref, "err", err) + chs.logger.Log("warning", "could not get revision for ref while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err) continue } ok = len(commits) == 0 @@ -228,7 +233,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn cancel() if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error()) - chs.logger.Log("warning", "could not clone from mirror while checking for changes", "repo", mirror, "ref", ref, "err", err) + chs.logger.Log("warning", "could not clone from mirror while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err) continue } newCloneForChart := clone{remote: mirror, ref: ref, head: refHead, export: newClone} @@ -240,10 +245,12 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn } } + // Enqueue release cacheKey, err := cache.MetaNamespaceKeyFunc(fhr.GetObjectMeta()) if err != nil { continue } + chs.logger.Log("info", "enqueing release upgrade due to change in git chart source", "resource", fhr.ResourceID().String()) chs.releaseQueue.AddRateLimited(cacheKey) } } @@ -325,12 +332,12 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { if !ok { chs.maybeMirror(fhr) chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo "+chartSource.GitURL+" not mirrored yet") - chs.logger.Log("info", "chart repo not cloned yet", "releaseName", releaseName, "resource", fmt.Sprintf("%s:%s/%s", fhr.Namespace, fhr.Kind, fhr.Name)) + chs.logger.Log("info", "chart repo not cloned yet", "resource", fhr.ResourceID().String()) } else { status, err := repo.Status() if status != git.RepoReady { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo not mirrored yet: "+err.Error()) - chs.logger.Log("info", "chart repo not ready yet", "releaseName", releaseName, "resource", fmt.Sprintf("%s:%s/%s", fhr.Namespace, fhr.Kind, fhr.Name), "status", string(status), "err", err) + chs.logger.Log("info", "chart repo not ready yet", "resource", fhr.ResourceID().String(), "status", string(status), "err", err) } } return @@ -342,7 +349,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { if chs.config.UpdateDeps && !fhr.Spec.ChartSource.GitChartSource.SkipDepUpdate { if err := updateDependencies(chartPath, ""); err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonDependencyFailed, err.Error()) - chs.logger.Log("warning", "Failed to update chart dependencies", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to update chart dependencies", "resource", fhr.ResourceID().String(), "err", err) return } } @@ -351,7 +358,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { path, err := ensureChartFetched(chs.config.ChartCache, chartSource) if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonDownloadFailed, "chart download failed: "+err.Error()) - chs.logger.Log("info", "chart download failed", "releaseName", releaseName, "resource", fhr.ResourceID().String(), "err", err) + chs.logger.Log("info", "chart download failed", "resource", fhr.ResourceID().String(), "err", err) return } chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionTrue, ReasonDownloaded, "chart fetched: "+filepath.Base(path)) @@ -363,7 +370,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { _, err := chs.release.Install(chartPath, releaseName, fhr, release.InstallAction, opts, &chs.kubeClient) if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonInstallFailed, err.Error()) - chs.logger.Log("warning", "Failed to install chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to install chart", "resource", fhr.ResourceID().String(), "err", err) return } chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm install succeeded") @@ -375,28 +382,28 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) { changed, err := chs.shouldUpgrade(chartPath, rel, fhr) if err != nil { - chs.logger.Log("warning", "Unable to determine if release has changed", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "unable to determine if release has changed", "resource", fhr.ResourceID().String(), "err", err) return } if changed { cFhr, err := chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace).Get(fhr.Name, metav1.GetOptions{}) if err != nil { - chs.logger.Log("warning", "Failed to retrieve HelmRelease scheduled for upgrade", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to retrieve HelmRelease scheduled for upgrade", "resource", fhr.ResourceID().String(), "err", err) return } if diff := cmp.Diff(fhr.Spec, cFhr.Spec); diff != "" { - chs.logger.Log("warning", "HelmRelease spec has diverged since we calculated if we should upgrade, skipping upgrade", "namespace", fhr.Namespace, "name", fhr.Name) + chs.logger.Log("warning", "HelmRelease spec has diverged since we calculated if we should upgrade, skipping upgrade", "resource", fhr.ResourceID().String()) return } _, err = chs.release.Install(chartPath, releaseName, fhr, release.UpgradeAction, opts, &chs.kubeClient) if err != nil { chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonUpgradeFailed, err.Error()) - chs.logger.Log("warning", "Failed to upgrade chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err) + chs.logger.Log("warning", "failed to upgrade chart", "resource", fhr.ResourceID().String(), "err", err) return } chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm upgrade succeeded") if err = status.UpdateReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil { - chs.logger.Log("warning", "could not update the release revision", "namespace", fhr.Namespace, "resource", fhr.Name, "err", err) + chs.logger.Log("warning", "could not update the release revision", "resource", fhr.ResourceID().String(), "err", err) } return } @@ -410,7 +417,7 @@ func (chs *ChartChangeSync) DeleteRelease(fhr fluxv1beta1.HelmRelease) { name := release.GetReleaseName(fhr) err := chs.release.Delete(name) if err != nil { - chs.logger.Log("warning", "Chart release not deleted", "release", name, "error", err) + chs.logger.Log("warning", "chart release not deleted", "resource", fhr.ResourceID().String(), "release", name, "err", err) } // Remove the clone we may have for this HelmRelease @@ -427,11 +434,11 @@ func (chs *ChartChangeSync) DeleteRelease(fhr fluxv1beta1.HelmRelease) { // SyncMirrors instructs all mirrors to refresh from their upstream. func (chs *ChartChangeSync) SyncMirrors() { - chs.logger.Log("info", "Starting mirror sync") + chs.logger.Log("info", "starting mirror sync") for _, err := range chs.mirrors.RefreshAll(chs.config.GitTimeout) { - chs.logger.Log("error", fmt.Sprintf("Failure while syncing mirror: %s", err)) + chs.logger.Log("error", fmt.Sprintf("failure while syncing mirror: %s", err)) } - chs.logger.Log("info", "Finished syncing mirrors") + chs.logger.Log("info", "finished syncing mirrors") } // getCustomResourcesForMirror retrieves all the resources that make @@ -521,7 +528,7 @@ func sortChartFields(c *hapi_chart.Chart) *hapi_chart.Chart { // doing a dry run install from the chart in the git repo. func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_release.Release, fhr fluxv1beta1.HelmRelease) (bool, error) { if currRel == nil { - return false, fmt.Errorf("No Chart release provided for %v", fhr.GetName()) + return false, fmt.Errorf("no chart release provided for %v", fhr.GetName()) } currVals := currRel.GetConfig() @@ -540,18 +547,18 @@ func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_relea // compare values && Chart if diff := cmp.Diff(currVals, desVals); diff != "" { if chs.config.LogDiffs { - chs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName()), "diff", diff) + chs.logger.Log("error", fmt.Sprintf("release %s: values have diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String(), "diff", diff) } else { - chs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName())) + chs.logger.Log("error", fmt.Sprintf("release %s: values have diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String()) } return true, nil } if diff := cmp.Diff(sortChartFields(currChart), sortChartFields(desChart)); diff != "" { if chs.config.LogDiffs { - chs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName()), "diff", diff) + chs.logger.Log("error", fmt.Sprintf("release %s: chart has diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String(), "diff", diff) } else { - chs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName())) + chs.logger.Log("error", fmt.Sprintf("release %s: chart has diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String()) } return true, nil } diff --git a/integrations/helm/helm.go b/integrations/helm/helm.go index afd8e4403..f803ee8c3 100644 --- a/integrations/helm/helm.go +++ b/integrations/helm/helm.go @@ -75,7 +75,7 @@ func ClientSetup(logger log.Logger, kubeClient *kubernetes.Clientset, tillerOpts for { helmClient, host, err = newClient(kubeClient, tillerOpts) if err != nil { - logger.Log("error", fmt.Sprintf("Error creating helm client: %s", err.Error())) + logger.Log("error", fmt.Sprintf("error creating helm client: %s", err.Error())) time.Sleep(20 * time.Second) continue } diff --git a/integrations/helm/http/daemon/server.go b/integrations/helm/http/daemon/server.go index f254d9981..d7a1cedc7 100644 --- a/integrations/helm/http/daemon/server.go +++ b/integrations/helm/http/daemon/server.go @@ -3,14 +3,15 @@ package daemon import ( "context" "fmt" + "net/http" + "sync/atomic" + "time" + "github.com/go-kit/kit/log" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/weaveworks/flux/integrations/helm/api" transport "github.com/weaveworks/flux/integrations/helm/http" - "net/http" - "sync/atomic" - "time" ) // ListenAndServe starts a HTTP server instrumented with Prometheus metrics, @@ -37,7 +38,7 @@ func ListenAndServe(listenAddr string, apiServer api.Server, logger log.Logger, IdleTimeout: 15 * time.Second, } - logger.Log("info", fmt.Sprintf("Starting HTTP server on %s", listenAddr)) + logger.Log("info", fmt.Sprintf("starting HTTP server on %s", listenAddr)) // run server in background go func() { diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index 0965a488f..cc6dcb9fd 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -97,13 +97,11 @@ func New( sync: sync, } - controller.logger.Log("info", "Setting up event handlers") + controller.logger.Log("info", "setting up event handlers") // ----- EVENT HANDLERS for HelmRelease resources change --------- fhrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(new interface{}) { - controller.logger.Log("info", "CREATING release") - controller.logger.Log("info", "Custom Resource driven release install") _, ok := checkCustomResourceType(controller.logger, new) if ok { controller.enqueueJob(new) @@ -119,7 +117,7 @@ func New( } }, }) - controller.logger.Log("info", "Event handlers set up") + controller.logger.Log("info", "event handlers set up") return controller } @@ -132,16 +130,16 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG defer runtime.HandleCrash() defer c.releaseWorkqueue.ShutDown() - c.logger.Log("info", "Starting operator") + c.logger.Log("info", "starting operator") // Wait for the caches to be synced before starting workers - c.logger.Log("info", "Waiting for informer caches to sync") + c.logger.Log("info", "waiting for informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, c.fhrSynced); !ok { return errors.New("failed to wait for caches to sync") } - c.logger.Log("info", "Informer caches synced") + c.logger.Log("info", "unformer caches synced") - c.logger.Log("info", "Starting workers") + c.logger.Log("info", "starting workers") for i := 0; i < threadiness; i++ { wg.Add(1) go wait.Until(c.runWorker, time.Second, stopCh) @@ -151,7 +149,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG for i := 0; i < threadiness; i++ { wg.Done() } - c.logger.Log("info", "Stopping workers") + c.logger.Log("info", "stopping workers") return nil } @@ -167,11 +165,7 @@ func (c *Controller) runWorker() { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func (c *Controller) processNextWorkItem() bool { - c.logger.Log("debug", "Processing next work queue job ...") - obj, shutdown := c.releaseWorkqueue.Get() - c.logger.Log("debug", fmt.Sprintf("PROCESSING item [%#v]", obj)) - if shutdown { return false } @@ -198,7 +192,7 @@ func (c *Controller) processNextWorkItem() bool { // Forget not to get into a loop of attempting to // process a work item that is invalid. c.releaseWorkqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("Expected string in workqueue but got %#v", obj)) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } @@ -206,14 +200,11 @@ func (c *Controller) processNextWorkItem() bool { // HelmRelease resource to sync the corresponding Chart release. // If the sync failed, then we return while the item will get requeued if err := c.syncHandler(key); err != nil { - return fmt.Errorf("error syncing '%s': %s", key, err.Error()) + return fmt.Errorf("errored syncing HelmRelease '%s': %s", key, err.Error()) } // If no error occurs we Forget this item so it does not // get queued again until another change happens. c.releaseWorkqueue.Forget(obj) - - c.logger.Log("info", fmt.Sprintf("Successfully synced '%s'", key)) - return nil }(obj) @@ -227,13 +218,11 @@ func (c *Controller) processNextWorkItem() bool { // syncHandler acts according to the action // Deletes/creates or updates a Chart release func (c *Controller) syncHandler(key string) error { - c.logger.Log("debug", fmt.Sprintf("Starting to sync cache key %s", key)) - // Retrieve namespace and Custom Resource name from the key namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - c.logger.Log("info", fmt.Sprintf("Invalid cache key: %v", err)) - runtime.HandleError(fmt.Errorf("Invalid cache key: %s", key)) + c.logger.Log("error", fmt.Sprintf("key '%s' is invalid: %v", key, err)) + runtime.HandleError(fmt.Errorf("key '%s' is invalid", key)) return nil } @@ -298,17 +287,23 @@ func (c *Controller) enqueueUpdateJob(old, new interface{}) { return } - c.logger.Log("info", "UPGRADING release") + log := []string{"info", "enqueuing release upgrade"} if diff := cmp.Diff(oldFhr.Spec, newFhr.Spec); diff != "" && c.logDiffs { - c.logger.Log("info", "Custom Resource driven release upgrade", "diff", diff) - } else { - c.logger.Log("info", "Custom Resource driven release upgrade") + log = append(log, "diff", diff) } + log = append(log, "resource", newFhr.ResourceID().String()) + + l := make([]interface{}, len(log)) + for i, v := range log { + l[i] = v + } + + c.logger.Log(l...) + c.enqueueJob(new) } func (c *Controller) deleteRelease(fhr flux_v1beta1.HelmRelease) { - c.logger.Log("info", "DELETING release") - c.logger.Log("info", "Custom Resource driven release deletion") + c.logger.Log("info", "deleting release", "resource", fhr.ResourceID().String()) c.sync.DeleteRelease(fhr) }