Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Showing 5 changed files with 65 additions and 62 deletions.
6 changes: 3 additions & 3 deletions cmd/helm-operator/main.go
Original file line number Diff line number Diff line change
@@ -137,19 +137,19 @@ func main() {

cfg, err := clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
if err != nil {
mainLogger.Log("error", fmt.Sprintf("Error building kubeconfig: %v", err))
mainLogger.Log("error", fmt.Sprintf("error building kubeconfig: %v", err))
os.Exit(1)
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
mainLogger.Log("error", fmt.Sprintf("Error building kubernetes clientset: %v", err))
mainLogger.Log("error", fmt.Sprintf("error building kubernetes clientset: %v", err))
os.Exit(1)
}

ifClient, err := clientset.NewForConfig(cfg)
if err != nil {
mainLogger.Log("error", fmt.Sprintf("Error building integrations clientset: %v", err))
mainLogger.Log("error", fmt.Sprintf("error building integrations clientset: %v", err))
os.Exit(1)
}

59 changes: 33 additions & 26 deletions integrations/helm/chartsync/chartsync.go
Original file line number Diff line number Diff line change
@@ -149,7 +149,7 @@ func New(logger log.Logger, clients Clients, release *release.Release, releaseQu
// Helm releases in the cluster, what HelmRelease declare, and
// changes in the git repos mentioned by any HelmRelease.
func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *sync.WaitGroup) {
chs.logger.Log("info", "Starting git chart sync loop")
chs.logger.Log("info", "starting git chart sync loop")

wg.Add(1)
go func() {
@@ -169,27 +169,32 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
continue
}

// Retrieve the mirror we got a change signal for
repo, ok := chs.mirrors.Get(mirror)
if !ok {
// Then why .. did you say .. it had changed? It may have been removed. Add it back and let it signal again.
chs.logger.Log("warning", "mirrored git repo disappeared after signalling change", "repo", mirror)
for _, fhr := range resources {
// Then why .. did you say .. it had changed? It may have been removed. Add it back and let it signal again.
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git mirror missing; starting mirroring again")
chs.logger.Log("warning", "mirrored git repo disappeared after signalling change", "repo", mirror)
chs.maybeMirror(fhr)
}
continue
}

// Ensure the repo is ready
status, err := repo.Status()
if status != git.RepoReady {
chs.logger.Log("info", "repo not ready yet, while attempting chart sync", "repo", mirror, "status", string(status))
for _, fhr := range resources {
chs.logger.Log("info", "repo not ready yet, while attempting chart sync", "repo", mirror, "status", string(status))
// TODO(michael) log if there's a problem with the following?
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, err.Error())
}
continue
}

// Determine if we need to update the clone and
// schedule an upgrade for every HelmRelease that
// makes use of the mirror
for _, fhr := range resources {
ref := fhr.Spec.ChartSource.GitChartSource.RefOrDefault()
path := fhr.Spec.ChartSource.GitChartSource.Path
@@ -200,7 +205,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
cancel()
if err != nil {
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error())
chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", mirror, "ref", ref, "err", err)
chs.logger.Log("warning", "could not get revision for ref while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err)
continue
}

@@ -216,7 +221,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
cancel()
if err != nil {
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error())
chs.logger.Log("warning", "could not get revision for ref while checking for changes", "repo", mirror, "ref", ref, "err", err)
chs.logger.Log("warning", "could not get revision for ref while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err)
continue
}
ok = len(commits) == 0
@@ -228,7 +233,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
cancel()
if err != nil {
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonGitNotReady, "problem cloning from local git mirror: "+err.Error())
chs.logger.Log("warning", "could not clone from mirror while checking for changes", "repo", mirror, "ref", ref, "err", err)
chs.logger.Log("warning", "could not clone from mirror while checking for changes", "resource", fhr.ResourceID().String(), "repo", mirror, "ref", ref, "err", err)
continue
}
newCloneForChart := clone{remote: mirror, ref: ref, head: refHead, export: newClone}
@@ -240,10 +245,12 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
}
}

// Enqueue release
cacheKey, err := cache.MetaNamespaceKeyFunc(fhr.GetObjectMeta())
if err != nil {
continue
}
chs.logger.Log("info", "enqueing release upgrade due to change in git chart source", "resource", fhr.ResourceID().String())
chs.releaseQueue.AddRateLimited(cacheKey)
}
}
@@ -325,12 +332,12 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) {
if !ok {
chs.maybeMirror(fhr)
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo "+chartSource.GitURL+" not mirrored yet")
chs.logger.Log("info", "chart repo not cloned yet", "releaseName", releaseName, "resource", fmt.Sprintf("%s:%s/%s", fhr.Namespace, fhr.Kind, fhr.Name))
chs.logger.Log("info", "chart repo not cloned yet", "resource", fhr.ResourceID().String())
} else {
status, err := repo.Status()
if status != git.RepoReady {
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionUnknown, ReasonGitNotReady, "git repo not mirrored yet: "+err.Error())
chs.logger.Log("info", "chart repo not ready yet", "releaseName", releaseName, "resource", fmt.Sprintf("%s:%s/%s", fhr.Namespace, fhr.Kind, fhr.Name), "status", string(status), "err", err)
chs.logger.Log("info", "chart repo not ready yet", "resource", fhr.ResourceID().String(), "status", string(status), "err", err)
}
}
return
@@ -342,7 +349,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) {
if chs.config.UpdateDeps && !fhr.Spec.ChartSource.GitChartSource.SkipDepUpdate {
if err := updateDependencies(chartPath, ""); err != nil {
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonDependencyFailed, err.Error())
chs.logger.Log("warning", "Failed to update chart dependencies", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
chs.logger.Log("warning", "failed to update chart dependencies", "resource", fhr.ResourceID().String(), "err", err)
return
}
}
@@ -351,7 +358,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) {
path, err := ensureChartFetched(chs.config.ChartCache, chartSource)
if err != nil {
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionFalse, ReasonDownloadFailed, "chart download failed: "+err.Error())
chs.logger.Log("info", "chart download failed", "releaseName", releaseName, "resource", fhr.ResourceID().String(), "err", err)
chs.logger.Log("info", "chart download failed", "resource", fhr.ResourceID().String(), "err", err)
return
}
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseChartFetched, v1.ConditionTrue, ReasonDownloaded, "chart fetched: "+filepath.Base(path))
@@ -363,7 +370,7 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) {
_, err := chs.release.Install(chartPath, releaseName, fhr, release.InstallAction, opts, &chs.kubeClient)
if err != nil {
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonInstallFailed, err.Error())
chs.logger.Log("warning", "Failed to install chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
chs.logger.Log("warning", "failed to install chart", "resource", fhr.ResourceID().String(), "err", err)
return
}
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm install succeeded")
@@ -375,28 +382,28 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) {

changed, err := chs.shouldUpgrade(chartPath, rel, fhr)
if err != nil {
chs.logger.Log("warning", "Unable to determine if release has changed", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
chs.logger.Log("warning", "unable to determine if release has changed", "resource", fhr.ResourceID().String(), "err", err)
return
}
if changed {
cFhr, err := chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace).Get(fhr.Name, metav1.GetOptions{})
if err != nil {
chs.logger.Log("warning", "Failed to retrieve HelmRelease scheduled for upgrade", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
chs.logger.Log("warning", "failed to retrieve HelmRelease scheduled for upgrade", "resource", fhr.ResourceID().String(), "err", err)
return
}
if diff := cmp.Diff(fhr.Spec, cFhr.Spec); diff != "" {
chs.logger.Log("warning", "HelmRelease spec has diverged since we calculated if we should upgrade, skipping upgrade", "namespace", fhr.Namespace, "name", fhr.Name)
chs.logger.Log("warning", "HelmRelease spec has diverged since we calculated if we should upgrade, skipping upgrade", "resource", fhr.ResourceID().String())
return
}
_, err = chs.release.Install(chartPath, releaseName, fhr, release.UpgradeAction, opts, &chs.kubeClient)
if err != nil {
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionFalse, ReasonUpgradeFailed, err.Error())
chs.logger.Log("warning", "Failed to upgrade chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
chs.logger.Log("warning", "failed to upgrade chart", "resource", fhr.ResourceID().String(), "err", err)
return
}
chs.setCondition(&fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm upgrade succeeded")
if err = status.UpdateReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil {
chs.logger.Log("warning", "could not update the release revision", "namespace", fhr.Namespace, "resource", fhr.Name, "err", err)
chs.logger.Log("warning", "could not update the release revision", "resource", fhr.ResourceID().String(), "err", err)
}
return
}
@@ -410,7 +417,7 @@ func (chs *ChartChangeSync) DeleteRelease(fhr fluxv1beta1.HelmRelease) {
name := release.GetReleaseName(fhr)
err := chs.release.Delete(name)
if err != nil {
chs.logger.Log("warning", "Chart release not deleted", "release", name, "error", err)
chs.logger.Log("warning", "chart release not deleted", "resource", fhr.ResourceID().String(), "release", name, "err", err)
}

// Remove the clone we may have for this HelmRelease
@@ -427,11 +434,11 @@ func (chs *ChartChangeSync) DeleteRelease(fhr fluxv1beta1.HelmRelease) {

// SyncMirrors instructs all mirrors to refresh from their upstream.
func (chs *ChartChangeSync) SyncMirrors() {
chs.logger.Log("info", "Starting mirror sync")
chs.logger.Log("info", "starting mirror sync")
for _, err := range chs.mirrors.RefreshAll(chs.config.GitTimeout) {
chs.logger.Log("error", fmt.Sprintf("Failure while syncing mirror: %s", err))
chs.logger.Log("error", fmt.Sprintf("failure while syncing mirror: %s", err))
}
chs.logger.Log("info", "Finished syncing mirrors")
chs.logger.Log("info", "finished syncing mirrors")
}

// getCustomResourcesForMirror retrieves all the resources that make
@@ -521,7 +528,7 @@ func sortChartFields(c *hapi_chart.Chart) *hapi_chart.Chart {
// doing a dry run install from the chart in the git repo.
func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_release.Release, fhr fluxv1beta1.HelmRelease) (bool, error) {
if currRel == nil {
return false, fmt.Errorf("No Chart release provided for %v", fhr.GetName())
return false, fmt.Errorf("no chart release provided for %v", fhr.GetName())
}

currVals := currRel.GetConfig()
@@ -540,18 +547,18 @@ func (chs *ChartChangeSync) shouldUpgrade(chartsRepo string, currRel *hapi_relea
// compare values && Chart
if diff := cmp.Diff(currVals, desVals); diff != "" {
if chs.config.LogDiffs {
chs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName()), "diff", diff)
chs.logger.Log("error", fmt.Sprintf("release %s: values have diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String(), "diff", diff)
} else {
chs.logger.Log("error", fmt.Sprintf("Release %s: values have diverged due to manual Chart release", currRel.GetName()))
chs.logger.Log("error", fmt.Sprintf("release %s: values have diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String())
}
return true, nil
}

if diff := cmp.Diff(sortChartFields(currChart), sortChartFields(desChart)); diff != "" {
if chs.config.LogDiffs {
chs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName()), "diff", diff)
chs.logger.Log("error", fmt.Sprintf("release %s: chart has diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String(), "diff", diff)
} else {
chs.logger.Log("error", fmt.Sprintf("Release %s: Chart has diverged due to manual Chart release", currRel.GetName()))
chs.logger.Log("error", fmt.Sprintf("release %s: chart has diverged due to manual chart release", currRel.GetName()), "resource", fhr.ResourceID().String())
}
return true, nil
}
2 changes: 1 addition & 1 deletion integrations/helm/helm.go
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ func ClientSetup(logger log.Logger, kubeClient *kubernetes.Clientset, tillerOpts
for {
helmClient, host, err = newClient(kubeClient, tillerOpts)
if err != nil {
logger.Log("error", fmt.Sprintf("Error creating helm client: %s", err.Error()))
logger.Log("error", fmt.Sprintf("error creating helm client: %s", err.Error()))
time.Sleep(20 * time.Second)
continue
}
9 changes: 5 additions & 4 deletions integrations/helm/http/daemon/server.go
Original file line number Diff line number Diff line change
@@ -3,14 +3,15 @@ package daemon
import (
"context"
"fmt"
"net/http"
"sync/atomic"
"time"

"github.com/go-kit/kit/log"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/weaveworks/flux/integrations/helm/api"
transport "github.com/weaveworks/flux/integrations/helm/http"
"net/http"
"sync/atomic"
"time"
)

// ListenAndServe starts a HTTP server instrumented with Prometheus metrics,
@@ -37,7 +38,7 @@ func ListenAndServe(listenAddr string, apiServer api.Server, logger log.Logger,
IdleTimeout: 15 * time.Second,
}

logger.Log("info", fmt.Sprintf("Starting HTTP server on %s", listenAddr))
logger.Log("info", fmt.Sprintf("starting HTTP server on %s", listenAddr))

// run server in background
go func() {
51 changes: 23 additions & 28 deletions integrations/helm/operator/operator.go
Original file line number Diff line number Diff line change
@@ -97,13 +97,11 @@ func New(
sync: sync,
}

controller.logger.Log("info", "Setting up event handlers")
controller.logger.Log("info", "setting up event handlers")

// ----- EVENT HANDLERS for HelmRelease resources change ---------
fhrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
controller.logger.Log("info", "CREATING release")
controller.logger.Log("info", "Custom Resource driven release install")
_, ok := checkCustomResourceType(controller.logger, new)
if ok {
controller.enqueueJob(new)
@@ -119,7 +117,7 @@ func New(
}
},
})
controller.logger.Log("info", "Event handlers set up")
controller.logger.Log("info", "event handlers set up")

return controller
}
@@ -132,16 +130,16 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG
defer runtime.HandleCrash()
defer c.releaseWorkqueue.ShutDown()

c.logger.Log("info", "Starting operator")
c.logger.Log("info", "starting operator")
// Wait for the caches to be synced before starting workers
c.logger.Log("info", "Waiting for informer caches to sync")
c.logger.Log("info", "waiting for informer caches to sync")

if ok := cache.WaitForCacheSync(stopCh, c.fhrSynced); !ok {
return errors.New("failed to wait for caches to sync")
}
c.logger.Log("info", "Informer caches synced")
c.logger.Log("info", "unformer caches synced")

c.logger.Log("info", "Starting workers")
c.logger.Log("info", "starting workers")
for i := 0; i < threadiness; i++ {
wg.Add(1)
go wait.Until(c.runWorker, time.Second, stopCh)
@@ -151,7 +149,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG
for i := 0; i < threadiness; i++ {
wg.Done()
}
c.logger.Log("info", "Stopping workers")
c.logger.Log("info", "stopping workers")

return nil
}
@@ -167,11 +165,7 @@ func (c *Controller) runWorker() {
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
c.logger.Log("debug", "Processing next work queue job ...")

obj, shutdown := c.releaseWorkqueue.Get()
c.logger.Log("debug", fmt.Sprintf("PROCESSING item [%#v]", obj))

if shutdown {
return false
}
@@ -198,22 +192,19 @@ func (c *Controller) processNextWorkItem() bool {
// Forget not to get into a loop of attempting to
// process a work item that is invalid.
c.releaseWorkqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("Expected string in workqueue but got %#v", obj))
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))

return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// HelmRelease resource to sync the corresponding Chart release.
// If the sync failed, then we return while the item will get requeued
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
return fmt.Errorf("errored syncing HelmRelease '%s': %s", key, err.Error())
}
// If no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.releaseWorkqueue.Forget(obj)

c.logger.Log("info", fmt.Sprintf("Successfully synced '%s'", key))

return nil
}(obj)

@@ -227,13 +218,11 @@ func (c *Controller) processNextWorkItem() bool {
// syncHandler acts according to the action
// Deletes/creates or updates a Chart release
func (c *Controller) syncHandler(key string) error {
c.logger.Log("debug", fmt.Sprintf("Starting to sync cache key %s", key))

// Retrieve namespace and Custom Resource name from the key
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
c.logger.Log("info", fmt.Sprintf("Invalid cache key: %v", err))
runtime.HandleError(fmt.Errorf("Invalid cache key: %s", key))
c.logger.Log("error", fmt.Sprintf("key '%s' is invalid: %v", key, err))
runtime.HandleError(fmt.Errorf("key '%s' is invalid", key))
return nil
}

@@ -298,17 +287,23 @@ func (c *Controller) enqueueUpdateJob(old, new interface{}) {
return
}

c.logger.Log("info", "UPGRADING release")
log := []string{"info", "enqueuing release upgrade"}
if diff := cmp.Diff(oldFhr.Spec, newFhr.Spec); diff != "" && c.logDiffs {
c.logger.Log("info", "Custom Resource driven release upgrade", "diff", diff)
} else {
c.logger.Log("info", "Custom Resource driven release upgrade")
log = append(log, "diff", diff)
}
log = append(log, "resource", newFhr.ResourceID().String())

l := make([]interface{}, len(log))
for i, v := range log {
l[i] = v
}

c.logger.Log(l...)

c.enqueueJob(new)
}

func (c *Controller) deleteRelease(fhr flux_v1beta1.HelmRelease) {
c.logger.Log("info", "DELETING release")
c.logger.Log("info", "Custom Resource driven release deletion")
c.logger.Log("info", "deleting release", "resource", fhr.ResourceID().String())
c.sync.DeleteRelease(fhr)
}

0 comments on commit a02f152

Please sign in to comment.