Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Create metric for flux manifest errors #2535

Merged
merged 1 commit into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/references/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,15 @@ The following metrics are exposed:
| `flux_daemon_queue_duration_seconds` | Duration of time spent in the job queue before execution
| `flux_daemon_queue_length_count` | Count of jobs waiting in the queue to be run
| `flux_daemon_sync_duration_seconds` | Duration of git-to-cluster synchronisation
| `flux_daemon_sync_manifests` | Number of manifests being synced to cluster
| `flux_registry_fetch_duration_seconds` | Duration of image metadata requests (from cache)
| `flux_fluxd_connection_duration_seconds` | Duration in seconds of the current connection to fluxsvc

Flux sync state can be obtained by using the following PromQL expressions:
* `delta(flux_daemon_sync_duration_seconds_count{success='true'}[6m]) < 1` - for general flux sync errors - usually if
that is true then there are some problems with infrastructure or there are manifests parse error or there are manifests
with duplicate ids.

* `flux_daemon_sync_manifests{success='false'} > 0` - for git manifests errors - if true then there are either some
problems with applying git manifests to kubernetes - e.g. configmap size is too big to fit in annotations or
immutable field (like label selector) was changed.
7 changes: 7 additions & 0 deletions pkg/daemon/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,11 @@ var (
Name: "queue_length_count",
Help: "Count of jobs waiting in the queue to be run.",
}, []string{})

syncManifestsMetric = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "flux",
Subsystem: "daemon",
Name: "sync_manifests",
Help: "Number of synchronized manifests",
}, []string{fluxmetrics.LabelSuccess})
)
9 changes: 9 additions & 0 deletions pkg/daemon/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
"github.com/fluxcd/flux/pkg/metrics"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"path/filepath"
Expand Down Expand Up @@ -157,6 +158,7 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl
switch syncerr := err.(type) {
case cluster.SyncError:
logger.Log("err", err)
updateSyncManifestsMetric(len(resources)-len(syncerr), len(syncerr))
for _, e := range syncerr {
resourceErrors = append(resourceErrors, event.ResourceError{
ID: e.ResourceID,
Expand All @@ -167,10 +169,17 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl
default:
return nil, nil, err
}
} else {
updateSyncManifestsMetric(len(resources), 0)
}
return resources, resourceErrors, nil
}

func updateSyncManifestsMetric(success, failure int) {
syncManifestsMetric.With(metrics.LabelSuccess, "true").Set(float64(success))
syncManifestsMetric.With(metrics.LabelSuccess, "false").Set(float64(failure))
}

// getChangedResources calculates what resources are modified during
// this sync.
func (d *Daemon) getChangedResources(ctx context.Context, c changeSet, timeout time.Duration, working *git.Export,
Expand Down
162 changes: 160 additions & 2 deletions pkg/daemon/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"testing"
"time"

"github.com/go-kit/kit/log"

"github.com/fluxcd/flux/pkg/cluster"
"github.com/fluxcd/flux/pkg/cluster/kubernetes"
"github.com/fluxcd/flux/pkg/cluster/kubernetes/testfiles"
Expand All @@ -26,6 +24,9 @@ import (
registryMock "github.com/fluxcd/flux/pkg/registry/mock"
"github.com/fluxcd/flux/pkg/resource"
fluxsync "github.com/fluxcd/flux/pkg/sync"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
promdto "github.com/prometheus/client_model/go"
)

const (
Expand Down Expand Up @@ -86,6 +87,50 @@ func daemon(t *testing.T) (*Daemon, func()) {
}
}

func findMetric(name string, metricType promdto.MetricType, labels ...string) (*promdto.Metric, error) {
metricsRegistry := prometheus.DefaultRegisterer.(*prometheus.Registry)
if metrics, err := metricsRegistry.Gather(); err == nil {
for _, metricFamily := range metrics {
if *metricFamily.Name == name {
if *metricFamily.Type != metricType {
return nil, fmt.Errorf("Metric types for %v doesn't correpond: %v != %v", name, metricFamily.Type, metricType)
}
for _, metric := range metricFamily.Metric {
if len(labels) != len(metric.Label)*2 {
return nil, fmt.Errorf("Metric labels length for %v doesn't correpond: %v != %v", name, len(labels)*2, len(metric.Label))
}
for labelIdx, label := range metric.Label {
if labels[labelIdx*2] != *label.Name {
return nil, fmt.Errorf("Metric label for %v doesn't correpond: %v != %v", name, labels[labelIdx*2], *label.Name)
} else if labels[labelIdx*2+1] != *label.Value {
break
} else if labelIdx == len(metric.Label)-1 {
return metric, nil
}
}
}
return nil, fmt.Errorf("Can't find metric %v with appropriate labels in registry", name)
}
}
return nil, fmt.Errorf("Can't find metric %v in registry", name)
} else {
return nil, fmt.Errorf("Error reading metrics registry %v", err)
}
}

func checkSyncManifestsMetrics(t *testing.T, manifestSuccess, manifestFailures int) {
if metric, err := findMetric("flux_daemon_sync_manifests", promdto.MetricType_GAUGE, "success", "true"); err != nil {
t.Errorf("Error collecting flux_daemon_sync_manifests{success='true'} metric: %v", err)
} else if int(*metric.Gauge.Value) != manifestSuccess {
t.Errorf("flux_daemon_sync_manifests{success='true'} must be %v. Got %v", manifestSuccess, *metric.Gauge.Value)
}
if metric, err := findMetric("flux_daemon_sync_manifests", promdto.MetricType_GAUGE, "success", "false"); err != nil {
t.Errorf("Error collecting flux_daemon_sync_manifests{success='false'} metric: %v", err)
} else if int(*metric.Gauge.Value) != manifestFailures {
t.Errorf("flux_daemon_sync_manifests{success='false'} must be %v. Got %v", manifestFailures, *metric.Gauge.Value)
}
}

func TestPullAndSync_InitialSync(t *testing.T) {
d, cleanup := daemon(t)
defer cleanup()
Expand Down Expand Up @@ -148,6 +193,9 @@ func TestPullAndSync_InitialSync(t *testing.T) {
} else if len(revs) <= 0 {
t.Errorf("Found no revisions before the sync tag")
}

// Check 0 error stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)
}

func TestDoSync_NoNewCommits(t *testing.T) {
Expand Down Expand Up @@ -357,3 +405,113 @@ func TestDoSync_WithNewCommit(t *testing.T) {
t.Errorf("Should have moved sync tag to HEAD (%s), but was moved to: %s", newRevision, revs[len(revs)-1].Revision)
}
}

func TestDoSync_WithErrors(t *testing.T) {
d, cleanup := daemon(t)
defer cleanup()

expectedResourceIDs := resource.IDs{}
for id, _ := range testfiles.ResourceMap {
expectedResourceIDs = append(expectedResourceIDs, id)
}

k8s.SyncFunc = func(def cluster.SyncSet) error {
return nil
}

ctx := context.Background()
head, err := d.Repo.BranchHead(ctx)
if err != nil {
t.Fatal(err)
}

syncTag := "sync"
gitSync, _ := fluxsync.NewGitTagSyncProvider(d.Repo, syncTag, "", false, d.GitConfig)
syncState := &lastKnownSyncState{logger: d.Logger, state: gitSync}

if err := d.Sync(ctx, time.Now().UTC(), head, syncState); err != nil {
t.Error(err)
}

// Check 0 error stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)

// Now add wrong manifest
err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error {
ctx, cancel := context.WithTimeout(ctx, 5000*time.Second)
defer cancel()

absolutePath := path.Join(checkout.Dir(), "error_manifest.yaml")
if err := ioutil.WriteFile(absolutePath, []byte("Manifest that must produce errors"), 0600); err != nil {
return err
}
commitAction := git.CommitAction{Author: "", Message: "test error commit"}
err = checkout.CommitAndPush(ctx, commitAction, nil, true)
if err != nil {
return err
}
return err
})
if err != nil {
t.Fatal(err)
}

err = d.Repo.Refresh(ctx)
if err != nil {
t.Error(err)
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
// Check error not nil, manifest counters remain the same
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)
} else {
t.Error("Sync must fail because of invalid manifest")
}

// Fix manifest
err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error {
ctx, cancel := context.WithTimeout(ctx, 5000*time.Second)
defer cancel()

absolutePath := path.Join(checkout.Dir(), "error_manifest.yaml")
if err := ioutil.WriteFile(absolutePath, []byte("# Just comment"), 0600); err != nil {
return err
}
commitAction := git.CommitAction{Author: "", Message: "test fix commit"}
err = checkout.CommitAndPush(ctx, commitAction, nil, true)
if err != nil {
return err
}
return err
})

if err != nil {
t.Fatal(err)
}

err = d.Repo.Refresh(ctx)
if err != nil {
t.Error(err)
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
t.Error(err)
}
// Check 0 manifest error stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)

// Emulate sync errors
k8s.SyncFunc = func(def cluster.SyncSet) error {
return cluster.SyncError{
cluster.ResourceError{resource.MustParseID("mynamespace:deployment/depl1"), "src1", fmt.Errorf("Error1")},
cluster.ResourceError{resource.MustParseID("mynamespace:deployment/depl2"), "src2", fmt.Errorf("Error2")},
}
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
t.Error(err)
}

// Check 2 sync error in stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs)-2, 2)
}