Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2535 from pulsepointinc/errors_metrics
Browse files Browse the repository at this point in the history
Create metric for flux manifest errors
  • Loading branch information
squaremo authored Oct 24, 2019
2 parents 874f1e9 + 5c026fa commit 28db2f5
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 2 deletions.
10 changes: 10 additions & 0 deletions docs/references/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,15 @@ The following metrics are exposed:
| `flux_daemon_queue_duration_seconds` | Duration of time spent in the job queue before execution
| `flux_daemon_queue_length_count` | Count of jobs waiting in the queue to be run
| `flux_daemon_sync_duration_seconds` | Duration of git-to-cluster synchronisation
| `flux_daemon_sync_manifests` | Number of manifests being synced to cluster
| `flux_registry_fetch_duration_seconds` | Duration of image metadata requests (from cache)
| `flux_fluxd_connection_duration_seconds` | Duration in seconds of the current connection to fluxsvc

Flux sync state can be obtained by using the following PromQL expressions:
* `delta(flux_daemon_sync_duration_seconds_count{success='true'}[6m]) < 1` - for general flux sync errors - usually if
that is true then there are some problems with infrastructure or there are manifests parse error or there are manifests
with duplicate ids.

* `flux_daemon_sync_manifests{success='false'} > 0` - for git manifests errors - if true then there are either some
problems with applying git manifests to kubernetes - e.g. configmap size is too big to fit in annotations or
immutable field (like label selector) was changed.
7 changes: 7 additions & 0 deletions pkg/daemon/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,11 @@ var (
Name: "queue_length_count",
Help: "Count of jobs waiting in the queue to be run.",
}, []string{})

syncManifestsMetric = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "flux",
Subsystem: "daemon",
Name: "sync_manifests",
Help: "Number of synchronized manifests",
}, []string{fluxmetrics.LabelSuccess})
)
9 changes: 9 additions & 0 deletions pkg/daemon/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
"github.com/fluxcd/flux/pkg/metrics"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"path/filepath"
Expand Down Expand Up @@ -157,6 +158,7 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl
switch syncerr := err.(type) {
case cluster.SyncError:
logger.Log("err", err)
updateSyncManifestsMetric(len(resources)-len(syncerr), len(syncerr))
for _, e := range syncerr {
resourceErrors = append(resourceErrors, event.ResourceError{
ID: e.ResourceID,
Expand All @@ -167,10 +169,17 @@ func doSync(ctx context.Context, manifestsStore manifests.Store, clus cluster.Cl
default:
return nil, nil, err
}
} else {
updateSyncManifestsMetric(len(resources), 0)
}
return resources, resourceErrors, nil
}

func updateSyncManifestsMetric(success, failure int) {
syncManifestsMetric.With(metrics.LabelSuccess, "true").Set(float64(success))
syncManifestsMetric.With(metrics.LabelSuccess, "false").Set(float64(failure))
}

// getChangedResources calculates what resources are modified during
// this sync.
func (d *Daemon) getChangedResources(ctx context.Context, c changeSet, timeout time.Duration, working *git.Export,
Expand Down
162 changes: 160 additions & 2 deletions pkg/daemon/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"testing"
"time"

"github.com/go-kit/kit/log"

"github.com/fluxcd/flux/pkg/cluster"
"github.com/fluxcd/flux/pkg/cluster/kubernetes"
"github.com/fluxcd/flux/pkg/cluster/kubernetes/testfiles"
Expand All @@ -26,6 +24,9 @@ import (
registryMock "github.com/fluxcd/flux/pkg/registry/mock"
"github.com/fluxcd/flux/pkg/resource"
fluxsync "github.com/fluxcd/flux/pkg/sync"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
promdto "github.com/prometheus/client_model/go"
)

const (
Expand Down Expand Up @@ -86,6 +87,50 @@ func daemon(t *testing.T) (*Daemon, func()) {
}
}

func findMetric(name string, metricType promdto.MetricType, labels ...string) (*promdto.Metric, error) {
metricsRegistry := prometheus.DefaultRegisterer.(*prometheus.Registry)
if metrics, err := metricsRegistry.Gather(); err == nil {
for _, metricFamily := range metrics {
if *metricFamily.Name == name {
if *metricFamily.Type != metricType {
return nil, fmt.Errorf("Metric types for %v doesn't correpond: %v != %v", name, metricFamily.Type, metricType)
}
for _, metric := range metricFamily.Metric {
if len(labels) != len(metric.Label)*2 {
return nil, fmt.Errorf("Metric labels length for %v doesn't correpond: %v != %v", name, len(labels)*2, len(metric.Label))
}
for labelIdx, label := range metric.Label {
if labels[labelIdx*2] != *label.Name {
return nil, fmt.Errorf("Metric label for %v doesn't correpond: %v != %v", name, labels[labelIdx*2], *label.Name)
} else if labels[labelIdx*2+1] != *label.Value {
break
} else if labelIdx == len(metric.Label)-1 {
return metric, nil
}
}
}
return nil, fmt.Errorf("Can't find metric %v with appropriate labels in registry", name)
}
}
return nil, fmt.Errorf("Can't find metric %v in registry", name)
} else {
return nil, fmt.Errorf("Error reading metrics registry %v", err)
}
}

func checkSyncManifestsMetrics(t *testing.T, manifestSuccess, manifestFailures int) {
if metric, err := findMetric("flux_daemon_sync_manifests", promdto.MetricType_GAUGE, "success", "true"); err != nil {
t.Errorf("Error collecting flux_daemon_sync_manifests{success='true'} metric: %v", err)
} else if int(*metric.Gauge.Value) != manifestSuccess {
t.Errorf("flux_daemon_sync_manifests{success='true'} must be %v. Got %v", manifestSuccess, *metric.Gauge.Value)
}
if metric, err := findMetric("flux_daemon_sync_manifests", promdto.MetricType_GAUGE, "success", "false"); err != nil {
t.Errorf("Error collecting flux_daemon_sync_manifests{success='false'} metric: %v", err)
} else if int(*metric.Gauge.Value) != manifestFailures {
t.Errorf("flux_daemon_sync_manifests{success='false'} must be %v. Got %v", manifestFailures, *metric.Gauge.Value)
}
}

func TestPullAndSync_InitialSync(t *testing.T) {
d, cleanup := daemon(t)
defer cleanup()
Expand Down Expand Up @@ -148,6 +193,9 @@ func TestPullAndSync_InitialSync(t *testing.T) {
} else if len(revs) <= 0 {
t.Errorf("Found no revisions before the sync tag")
}

// Check 0 error stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)
}

func TestDoSync_NoNewCommits(t *testing.T) {
Expand Down Expand Up @@ -357,3 +405,113 @@ func TestDoSync_WithNewCommit(t *testing.T) {
t.Errorf("Should have moved sync tag to HEAD (%s), but was moved to: %s", newRevision, revs[len(revs)-1].Revision)
}
}

func TestDoSync_WithErrors(t *testing.T) {
d, cleanup := daemon(t)
defer cleanup()

expectedResourceIDs := resource.IDs{}
for id, _ := range testfiles.ResourceMap {
expectedResourceIDs = append(expectedResourceIDs, id)
}

k8s.SyncFunc = func(def cluster.SyncSet) error {
return nil
}

ctx := context.Background()
head, err := d.Repo.BranchHead(ctx)
if err != nil {
t.Fatal(err)
}

syncTag := "sync"
gitSync, _ := fluxsync.NewGitTagSyncProvider(d.Repo, syncTag, "", false, d.GitConfig)
syncState := &lastKnownSyncState{logger: d.Logger, state: gitSync}

if err := d.Sync(ctx, time.Now().UTC(), head, syncState); err != nil {
t.Error(err)
}

// Check 0 error stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)

// Now add wrong manifest
err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error {
ctx, cancel := context.WithTimeout(ctx, 5000*time.Second)
defer cancel()

absolutePath := path.Join(checkout.Dir(), "error_manifest.yaml")
if err := ioutil.WriteFile(absolutePath, []byte("Manifest that must produce errors"), 0600); err != nil {
return err
}
commitAction := git.CommitAction{Author: "", Message: "test error commit"}
err = checkout.CommitAndPush(ctx, commitAction, nil, true)
if err != nil {
return err
}
return err
})
if err != nil {
t.Fatal(err)
}

err = d.Repo.Refresh(ctx)
if err != nil {
t.Error(err)
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
// Check error not nil, manifest counters remain the same
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)
} else {
t.Error("Sync must fail because of invalid manifest")
}

// Fix manifest
err = d.WithWorkingClone(ctx, func(checkout *git.Checkout) error {
ctx, cancel := context.WithTimeout(ctx, 5000*time.Second)
defer cancel()

absolutePath := path.Join(checkout.Dir(), "error_manifest.yaml")
if err := ioutil.WriteFile(absolutePath, []byte("# Just comment"), 0600); err != nil {
return err
}
commitAction := git.CommitAction{Author: "", Message: "test fix commit"}
err = checkout.CommitAndPush(ctx, commitAction, nil, true)
if err != nil {
return err
}
return err
})

if err != nil {
t.Fatal(err)
}

err = d.Repo.Refresh(ctx)
if err != nil {
t.Error(err)
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
t.Error(err)
}
// Check 0 manifest error stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs), 0)

// Emulate sync errors
k8s.SyncFunc = func(def cluster.SyncSet) error {
return cluster.SyncError{
cluster.ResourceError{resource.MustParseID("mynamespace:deployment/depl1"), "src1", fmt.Errorf("Error1")},
cluster.ResourceError{resource.MustParseID("mynamespace:deployment/depl2"), "src2", fmt.Errorf("Error2")},
}
}

if err := d.Sync(ctx, time.Now().UTC(), "HEAD", syncState); err != nil {
t.Error(err)
}

// Check 2 sync error in stats
checkSyncManifestsMetrics(t, len(expectedResourceIDs)-2, 2)
}

0 comments on commit 28db2f5

Please sign in to comment.