Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Return sync errors in ListServices #1410

Merged
merged 3 commits into from
Oct 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type Controller struct {
Antecedent flux.ResourceID
Labels map[string]string
Rollout RolloutStatus
// Errors during the recurring sync from the Git repository to the
// cluster will surface here.
SyncError error

Containers ContainersOrExcuse
}
Expand Down
34 changes: 30 additions & 4 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ type Cluster struct {
logger log.Logger
sshKeyRing ssh.KeyRing

// syncErrors keeps a record of all per-resource errors during
// the sync from Git repo to the cluster.
syncErrors map[flux.ResourceID]error
muSyncErrors sync.RWMutex

nsWhitelist []string
nsWhitelistLogged map[string]bool // to keep track of whether we've logged a problem with seeing a whitelisted ns

Expand Down Expand Up @@ -146,6 +151,9 @@ func (c *Cluster) SomeControllers(ids []flux.ResourceID) (res []cluster.Controll
}

if !isAddon(podController) {
c.muSyncErrors.RLock()
podController.syncError = c.syncErrors[id]
c.muSyncErrors.RUnlock()
controllers = append(controllers, podController.toClusterController(id))
}
}
Expand Down Expand Up @@ -180,6 +188,9 @@ func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, er
for _, podController := range podControllers {
if !isAddon(podController) {
id := flux.MakeResourceID(ns.Name, kind, podController.name)
c.muSyncErrors.RLock()
podController.syncError = c.syncErrors[id]
c.muSyncErrors.RUnlock()
allControllers = append(allControllers, podController.toClusterController(id))
}
}
Expand Down Expand Up @@ -221,15 +232,30 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {

c.mu.Lock()
defer c.mu.Unlock()
if applyErrs := c.applier.apply(logger, cs); len(applyErrs) > 0 {
c.muSyncErrors.RLock()
if applyErrs := c.applier.apply(logger, cs, c.syncErrors); len(applyErrs) > 0 {
errs = append(errs, applyErrs...)
}
c.muSyncErrors.RUnlock()

// If `nil`, errs is a cluster.SyncError(nil) rather than error(nil)
if errs != nil {
return errs
if errs == nil {
return nil
}

// It is expected that Cluster.Sync is invoked with *all* resources.
// Otherwise it will override previously recorded sync errors.
c.setSyncErrors(errs)
return errs
}

func (c *Cluster) setSyncErrors(errs cluster.SyncError) {
c.muSyncErrors.Lock()
defer c.muSyncErrors.Unlock()
c.syncErrors = make(map[flux.ResourceID]error)
for _, e := range errs {
c.syncErrors[e.ResourceID()] = e.Error
}
return nil
}

func (c *Cluster) Ping() error {
Expand Down
2 changes: 2 additions & 0 deletions cluster/kubernetes/resourcekinds.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type podController struct {
name string
status string
rollout cluster.RolloutStatus
syncError error
podTemplate apiv1.PodTemplateSpec
}

Expand Down Expand Up @@ -86,6 +87,7 @@ func (pc podController) toClusterController(resourceID flux.ResourceID) cluster.
ID: resourceID,
Status: pc.status,
Rollout: pc.rollout,
SyncError: pc.syncError,
Antecedent: antecedent,
Labels: pc.GetLabels(),
Containers: cluster.ContainersOrExcuse{Containers: clusterContainers, Excuse: excuse},
Expand Down
32 changes: 26 additions & 6 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
)

Expand All @@ -30,7 +31,7 @@ func (c *changeSet) stage(cmd string, o *apiObject) {

// Applier is something that will apply a changeset to the cluster.
type Applier interface {
apply(log.Logger, changeSet) cluster.SyncError
apply(log.Logger, changeSet, map[flux.ResourceID]error) cluster.SyncError
}

type Kubectl struct {
Expand Down Expand Up @@ -113,21 +114,40 @@ func (objs applyOrder) Less(i, j int) bool {
return ranki < rankj
}

func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError) {
func (c *Kubectl) apply(logger log.Logger, cs changeSet, errored map[flux.ResourceID]error) (errs cluster.SyncError) {
f := func(objs []*apiObject, cmd string, args ...string) {
if len(objs) == 0 {
return
}
logger.Log("cmd", cmd, "args", strings.Join(args, " "), "count", len(objs))
args = append(args, cmd)
if err := c.doCommand(logger, makeMultidoc(objs), args...); err != nil {

var multi, single []*apiObject
if len(errored) == 0 {
multi = objs
} else {
for _, obj := range objs {
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs = append(errs, cluster.ResourceError{obj.Resource, err})
if _, ok := errored[obj.ResourceID()]; ok {
// Resources that errored before shall be applied separately
single = append(single, obj)
} else {
// everything else will be tried in a multidoc apply.
multi = append(multi, obj)
}
}
}

if len(multi) > 0 {
if err := c.doCommand(logger, makeMultidoc(multi), args...); err != nil {
single = append(single, multi...)
}
}
for _, obj := range single {
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs = append(errs, cluster.ResourceError{obj.Resource, err})
}
}
}

// When deleting objects, the only real concern is that we don't
Expand Down
4 changes: 2 additions & 2 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type mockApplier struct {
commandRun bool
}

func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncError {
func (m *mockApplier) apply(_ log.Logger, c changeSet, errored map[flux.ResourceID]error) cluster.SyncError {
if len(c.objs) != 0 {
m.commandRun = true
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestSyncMalformed(t *testing.T) {
err := kube.Sync(cluster.SyncDef{
Actions: []cluster.SyncAction{
cluster.SyncAction{
Apply: rsc{"id", []byte("garbage")},
Apply: rsc{"default:deployment/trash", []byte("garbage")},
},
},
})
Expand Down
7 changes: 7 additions & 0 deletions daemon/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@ package daemon

import (
"fmt"
"sync"

"github.com/weaveworks/flux"
fluxerr "github.com/weaveworks/flux/errors"
"github.com/weaveworks/flux/job"
)

type SyncErrors struct {
errs map[flux.ResourceID]error
mu sync.Mutex
}

func manifestLoadError(reason error) error {
return &fluxerr.Error{
Type: fluxerr.User,
Expand Down
8 changes: 4 additions & 4 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
return errors.Wrap(err, "loading resources from repo")
}

var syncErrors []event.ResourceError
var resourceErrors []event.ResourceError
// TODO supply deletes argument from somewhere (command-line?)
if err := fluxsync.Sync(d.Manifests, allResources, d.Cluster, false, logger); err != nil {
if err := fluxsync.Sync(logger, d.Manifests, allResources, d.Cluster, false); err != nil {
logger.Log("err", err)
switch syncerr := err.(type) {
case cluster.SyncError:
for _, e := range syncerr {
syncErrors = append(syncErrors, event.ResourceError{
resourceErrors = append(resourceErrors, event.ResourceError{
ID: e.ResourceID(),
Path: e.Source(),
Error: e.Error.Error(),
Expand Down Expand Up @@ -368,7 +368,7 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) {
Commits: cs,
InitialSync: initialSync,
Includes: includes,
Errors: syncErrors,
Errors: resourceErrors,
},
}); err != nil {
logger.Log("err", err)
Expand Down
3 changes: 2 additions & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

// Sync synchronises the cluster to the files in a directory
func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster, deletes bool, logger log.Logger) error {
func Sync(logger log.Logger, m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster,
deletes bool) error {
// Get a map of resources defined in the cluster
clusterBytes, err := clus.Export()

Expand Down
4 changes: 2 additions & 2 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSync(t *testing.T) {
t.Fatal(err)
}

if err := Sync(manifests, resources, clus, true, log.NewNopLogger()); err != nil {
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil {
t.Fatal(err)
}
checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)
Expand All @@ -60,7 +60,7 @@ func TestSync(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := Sync(manifests, resources, clus, true, log.NewNopLogger()); err != nil {
if err := Sync(log.NewNopLogger(), manifests, resources, clus, true); err != nil {
t.Fatal(err)
}
checkClusterMatchesFiles(t, manifests, clus, checkout.Dir(), dirs)
Expand Down