Skip to content

Commit

Permalink
added collector type so we could monitor them independently
Browse files Browse the repository at this point in the history
removed recorder test as the logic is being tested within the collector lifecycle
  • Loading branch information
enekofb committed Jul 3, 2023
1 parent 35779e4 commit f828883
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 60 deletions.
14 changes: 8 additions & 6 deletions pkg/query/collector/metrics/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

const collectorSubsystem = "collector"
const (
collectorSubsystem = "collector"
)

var clusterWatcher = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: collectorSubsystem,
Name: "cluster_watcher",
Help: "number of active cluster watchers by watcher status",
}, []string{"status"})
}, []string{"collector", "status"})

func init() {
prometheus.MustRegister(clusterWatcher)
}

// ClusterWatcherDecrease decreases collector_cluster_watcher for status
func ClusterWatcherDecrease(status string) {
clusterWatcher.WithLabelValues(status).Dec()
func ClusterWatcherDecrease(collector string, status string) {
clusterWatcher.WithLabelValues(collector, status).Dec()
}

// ClusterWatcherIncrease increases collector_cluster_watcher metric for status
func ClusterWatcherIncrease(status string) {
clusterWatcher.WithLabelValues(status).Inc()
func ClusterWatcherIncrease(collector string, status string) {
clusterWatcher.WithLabelValues(collector, status).Inc()
}
51 changes: 0 additions & 51 deletions pkg/query/collector/metrics/recorder_test.go

This file was deleted.

19 changes: 18 additions & 1 deletion pkg/query/collector/watching.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"github.com/weaveworks/weave-gitops-enterprise/pkg/query/collector/metrics"

"github.com/go-logr/logr"
"github.com/weaveworks/weave-gitops-enterprise/pkg/query/collector/clusters"
"github.com/weaveworks/weave-gitops/core/clustersmngr/cluster"
Expand Down Expand Up @@ -90,13 +92,24 @@ outer:
type child struct {
Starter
cancel context.CancelFunc
collector string
status string
lastStatusChange time.Time
}

// setStatus sets watcher status and records it as a metric.
// It does not record metrics for stopped state non-active states = notStarted and Stopped
func (c *child) setStatus(s string) {
if c.status != "" {
metrics.ClusterWatcherDecrease(c.collector, c.status)
}

c.lastStatusChange = time.Now()
c.status = s

if c.status != ClusterWatchingStopped {
metrics.ClusterWatcherIncrease(c.collector, c.status)
}
}

// watchingCollector supervises watchers, starting one per cluster it
Expand Down Expand Up @@ -139,7 +152,9 @@ func (w *watchingCollector) watch(cluster cluster.Cluster) (reterr error) {
}

// make the record, so status works
c := &child{}
c := &child{
collector: w.name,
}
c.setStatus(ClusterWatchingStarting)
childctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
Expand Down Expand Up @@ -193,6 +208,8 @@ func (w *watchingCollector) watch(cluster cluster.Cluster) (reterr error) {
// TODO remove from map?
w.clusterWatchersMu.Lock()
c.setStatus(ClusterWatchingStopped)
w.clusterWatchers[clusterName] = nil

w.clusterWatchersMu.Unlock()
}()

Expand Down
60 changes: 58 additions & 2 deletions pkg/query/collector/watching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package collector
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"sync/atomic"
"testing"

"github.com/weaveworks/weave-gitops-enterprise/pkg/metrics"

"github.com/go-logr/logr"
"github.com/go-logr/logr/testr"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -167,6 +171,7 @@ func TestClusterWatcher_Watch(t *testing.T) {
log := testr.New(t)
opts := CollectorOpts{
Log: log,
Name: "objects",
NewWatcherFunc: newFakeWatcher,
ServiceAccount: ImpersonateServiceAccount{
Namespace: "flux-system",
Expand All @@ -179,6 +184,10 @@ func TestClusterWatcher_Watch(t *testing.T) {

c := makeValidFakeCluster("testcluster")

metrics.NewPrometheusServer(metrics.Options{
ServerAddress: "localhost:8080",
})

tests := []struct {
name string
cluster cluster.Cluster
Expand All @@ -194,27 +203,55 @@ func TestClusterWatcher_Watch(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
err = collector.watch(tt.cluster)
g.Expect(err).To(BeNil())
t.Cleanup(func() {
err := collector.unwatch(c.GetName())
if err != nil {
t.Fatal(err)
}
})

if tt.errPattern != "" {
g.Expect(err).To(MatchError(MatchRegexp(tt.errPattern)))
return
}

if tt.errPattern == "" {
g.Eventually(func() bool {
s, err := collector.Status(tt.cluster.GetName())
return err == nil && s == ClusterWatchingStarted
}, "2s", "0.2s").Should(BeTrue())
assertMetrics(g, []string{
`collector_cluster_watcher{collector="objects",status="starting"} 0`,
`collector_cluster_watcher{collector="objects",status="started"} 1`,
})
}
})
}
}

func assertMetrics(g *WithT, expMetrics []string) {
req, err := http.NewRequest(http.MethodGet, "http://localhost:8080/metrics", nil)
g.Expect(err).NotTo(HaveOccurred())
resp, err := http.DefaultClient.Do(req)
g.Expect(err).NotTo(HaveOccurred())
b, err := io.ReadAll(resp.Body)
g.Expect(err).NotTo(HaveOccurred())
metrics := string(b)

for _, expMetric := range expMetrics {
//Contains expected value
g.Expect(metrics).To(ContainSubstring(expMetric))
}
}

func TestClusterWatcher_Unwatch(t *testing.T) {
g := NewGomegaWithT(t)
log := testr.New(t)
clusterName := "testCluster"

metrics.NewPrometheusServer(metrics.Options{
ServerAddress: "localhost:8080",
})

tests := []struct {
name string
clusterName string
Expand All @@ -240,6 +277,7 @@ func TestClusterWatcher_Unwatch(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
opts := CollectorOpts{
Log: log,
Name: "objects",
NewWatcherFunc: newFakeWatcher,
ServiceAccount: ImpersonateServiceAccount{
Namespace: "flux-system",
Expand All @@ -252,7 +290,6 @@ func TestClusterWatcher_Unwatch(t *testing.T) {

c := makeValidFakeCluster(clusterName)
g.Expect(collector.watch(c)).To(Succeed())

if tt.errPattern == "" {
g.Expect(err).To(BeNil())
g.Eventually(func() bool {
Expand All @@ -262,13 +299,25 @@ func TestClusterWatcher_Unwatch(t *testing.T) {
}
err = collector.unwatch(tt.clusterName)
if tt.errPattern != "" {
t.Cleanup(func() {
err := collector.unwatch(c.GetName())
if err != nil {
t.Fatal(err)
}
})

g.Expect(err).To(MatchError(MatchRegexp(tt.errPattern)))
return
} else {
// fetching the status after it's unwatched should
// error, since it will have been forgotten.
_, err := collector.Status(tt.clusterName)
g.Expect(err).To(HaveOccurred())

assertMetrics(g, []string{
`collector_cluster_watcher{collector="objects",status="starting"} 0`,
`collector_cluster_watcher{collector="objects",status="started"} 0`,
})
}
})
}
Expand All @@ -279,6 +328,7 @@ func TestClusterWatcher_Status(t *testing.T) {
log := testr.New(t)
options := CollectorOpts{
Log: log,
Name: "objects",
NewWatcherFunc: newFakeWatcher,
ServiceAccount: ImpersonateServiceAccount{
Namespace: "flux-system",
Expand All @@ -293,6 +343,12 @@ func TestClusterWatcher_Status(t *testing.T) {
c := makeValidFakeCluster(existingClusterName)
err = collector.watch(c)
g.Expect(err).To(BeNil())
t.Cleanup(func() {
err := collector.unwatch(c.GetName())
if err != nil {
t.Fatal(err)
}
})

tests := []struct {
name string
Expand Down

0 comments on commit f828883

Please sign in to comment.