Skip to content

Commit

Permalink
Adds Shutdown to Metrics API
Browse files Browse the repository at this point in the history
Short lived applications run the risk of losing metrics that are
generated near the end of their lifetimes because MetricSinks can
and do buffer data locally. The exact amount of data loss depends on
MetricSink implementation, caller configuration, and the timing of
the last metric sync.

This adds a Shutdown function to the package API and the MetricSink
interface. Shutdown flushes locally buffered data to storage, and
then frees resources allocated to the MetricSink.

Currently not all MetricSinks support exiting, and so resource
release when calling Shutdown is best effort. Since Shutdown is
intended for use immediately prior to application exit this is deemed
acceptable, since resource leakage is minimized in this case.
  • Loading branch information
ggambetti committed Apr 14, 2022
1 parent d1e5690 commit 500c7d6
Show file tree
Hide file tree
Showing 13 changed files with 70 additions and 3 deletions.
7 changes: 7 additions & 0 deletions circonus/circonus.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []m
s.metrics.RecordValue(flatKey, float64(val))
}

func (s *CirconusSink) Shutdown() {
// The used version of the circonus metrics library does not support a shutdown operation.
// Instead we call Flush which blocks until metrics are submitted to storage, and then exit
// as the README examples do.
s.metrics.Flush()
}

// Flattens key to Circonus metric name
func (s *CirconusSink) flattenKey(parts []string) string {
joined := strings.Join(parts, "`")
Expand Down
7 changes: 7 additions & 0 deletions circonus/circonus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"net/http/httptest"
"strings"
"testing"

"github.com/armon/go-metrics"
)

func TestNewCirconusSink(t *testing.T) {
Expand Down Expand Up @@ -152,3 +154,8 @@ func TestAddSample(t *testing.T) {

}
}

func TestMetricSinkInterface(t *testing.T) {
var cs *CirconusSink
_ = metrics.MetricSink(cs)
}
1 change: 1 addition & 0 deletions const_unix.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !windows
// +build !windows

package metrics
Expand Down
1 change: 1 addition & 0 deletions const_windows.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build windows
// +build windows

package metrics
Expand Down
4 changes: 4 additions & 0 deletions datadog/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels []
s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate)
}

func (s *DogStatsdSink) Shutdown() {
s.client.Close()
}

func (s *DogStatsdSink) getFlatkeyAndCombinedLabels(key []string, labels []metrics.Label) (string, []string) {
key, parsedLabels := s.parseKey(key)
flatKey := s.flattenKey(key)
Expand Down
5 changes: 5 additions & 0 deletions datadog/dogstatsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,8 @@ func assertServerMatchesExpected(t *testing.T, server *net.UDPConn, buf []byte,
t.Fatalf("Line %s does not match expected: %s", string(msg), expected)
}
}

func TestMetricSinkInterface(t *testing.T) {
var dd *DogStatsdSink
_ = metrics.MetricSink(dd)
}
4 changes: 4 additions & 0 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Labe
agg.Ingest(float64(val), i.rateDenom)
}

func (i *InmemSink) Shutdown() {
// Do nothing. InmemSink does not have cleanup associated with shutdown.
}

// Data is used to retrieve all the aggregated metrics
// Intervals may be in use, and a read lock should be acquired
func (i *InmemSink) Data() []*IntervalMetrics {
Expand Down
4 changes: 4 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func (m *Metrics) UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabe
}
}

func (m *Metrics) Shutdown() {
m.sink.Shutdown()
}

// labelIsAllowed return true if a should be included in metric
// the caller should lock m.filterLock while calling this method
func (m *Metrics) labelIsAllowed(label *Label) bool {
Expand Down
7 changes: 6 additions & 1 deletion prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build go1.9
// +build go1.9

package prometheus
Expand All @@ -20,7 +21,7 @@ var (
// PrometheusSink.
DefaultPrometheusOpts = PrometheusOpts{
Expiration: 60 * time.Second,
Name: "default_prometheus_sink",
Name: "default_prometheus_sink",
}
)

Expand Down Expand Up @@ -393,6 +394,10 @@ func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labe
}
}

// Shutdown is not implemented. PrometheusSink is in memory storage.
func (p *PrometheusSink) Shutdown() {
}

// PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address
// on an interval.
type PrometheusPushSink struct {
Expand Down
12 changes: 10 additions & 2 deletions prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestNewPrometheusSink(t *testing.T) {
t.Fatalf("Unregister(sink) = false, want true")
}
}

// TestMultiplePrometheusSink tests registering multiple sinks on the same registerer with different descriptors
func TestMultiplePrometheusSink(t *testing.T) {
gaugeDef := GaugeDefinition{
Expand All @@ -66,14 +67,14 @@ func TestMultiplePrometheusSink(t *testing.T) {
GaugeDefinitions: append([]GaugeDefinition{}, gaugeDef),
SummaryDefinitions: append([]SummaryDefinition{}),
CounterDefinitions: append([]CounterDefinition{}),
Name: "sink1",
Name: "sink1",
}

sink1, err := NewPrometheusSinkFrom(cfg)
if err != nil {
t.Fatalf("err = %v, want nil", err)
}

reg := prometheus.DefaultRegisterer
if reg == nil {
t.Fatalf("Expected default register to be non nil, got nil.")
Expand Down Expand Up @@ -359,3 +360,10 @@ func TestDefinitionsWithLabels(t *testing.T) {
return true
})
}

func TestMetricSinkInterface(t *testing.T) {
var ps *PrometheusSink
_ = metrics.MetricSink(ps)
var pps *PrometheusPushSink
_ = metrics.MetricSink(pps)
}
10 changes: 10 additions & 0 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type MetricSink interface {
// Samples are for timing information, where quantiles are used
AddSample(key []string, val float32)
AddSampleWithLabels(key []string, val float32, labels []Label)

// Shutdown the sink, flushing data, and performing cleanup as necessary.
Shutdown()
}

// BlackholeSink is used to just blackhole messages
Expand All @@ -34,6 +37,7 @@ func (*BlackholeSink) IncrCounter(key []string, val float32)
func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) AddSample(key []string, val float32) {}
func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) Shutdown() {}

// FanoutSink is used to sink to fanout values to multiple sinks
type FanoutSink []MetricSink
Expand Down Expand Up @@ -74,6 +78,12 @@ func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Lab
}
}

func (fh FanoutSink) Shutdown() {
for _, s := range fh {
s.Shutdown()
}
}

// sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
// by each sink type
type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
Expand Down
1 change: 1 addition & 0 deletions sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (m *MockSink) AddSampleWithLabels(key []string, val float32, labels []Label
m.vals = append(m.vals, val)
m.labels = append(m.labels, labels)
}
func (m *MockSink) Shutdown() {}

func TestFanoutSink_Gauge(t *testing.T) {
m1 := &MockSink{}
Expand Down
10 changes: 10 additions & 0 deletions start.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,13 @@ func UpdateFilter(allow, block []string) {
func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) {
globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels)
}

// Shutdown flushes and disables metric collection, blocking while waiting for this to complete.
// WARNING: Not all MetricSink backends support this functionality, and calling this will cause resource leaks.
// This is intended for use immediately prior to application exit.
func Shutdown() {
m := globalMetrics.Load().(*Metrics)
// Replace global metrics with the BlackholeSink like how init setup the library.
globalMetrics.Store(&Metrics{sink: &BlackholeSink{}})
m.Shutdown()
}

0 comments on commit 500c7d6

Please sign in to comment.