From 500c7d6567e3843f7e38bbc07fe47e761368bae4 Mon Sep 17 00:00:00 2001 From: Gianni Gambetti <99784476+ggambetti@users.noreply.github.com> Date: Thu, 14 Apr 2022 12:27:04 -0400 Subject: [PATCH 1/3] Adds Shutdown to Metrics API Short lived applications run the risk of losing metrics that are generated near the end of their lifetimes because MetricSinks can and do buffer data locally. The exact amount of data loss depends on MetricSink implementation, caller configuration, and the timing of the last metric sync. This adds a Shutdown function to the package API and the MetricSink interface. Shutdown flushes locally buffered data to storage, and then frees resources allocated to the MetricSink. Currently not all MetricSinks support exiting, and so resource release when calling Shutdown is best effort. Since Shutdown is intended for use immediately prior to application exit this is deemed acceptable, since resource leakage is minimized in this case. --- circonus/circonus.go | 7 +++++++ circonus/circonus_test.go | 7 +++++++ const_unix.go | 1 + const_windows.go | 1 + datadog/dogstatsd.go | 4 ++++ datadog/dogstatsd_test.go | 5 +++++ inmem.go | 4 ++++ metrics.go | 4 ++++ prometheus/prometheus.go | 7 ++++++- prometheus/prometheus_test.go | 12 ++++++++++-- sink.go | 10 ++++++++++ sink_test.go | 1 + start.go | 10 ++++++++++ 13 files changed, 70 insertions(+), 3 deletions(-) diff --git a/circonus/circonus.go b/circonus/circonus.go index eb41b99..2892d24 100644 --- a/circonus/circonus.go +++ b/circonus/circonus.go @@ -97,6 +97,13 @@ func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []m s.metrics.RecordValue(flatKey, float64(val)) } +func (s *CirconusSink) Shutdown() { + // The used version of the circonus metrics library does not support a shutdown operation. + // Instead we call Flush which blocks until metrics are submitted to storage, and then exit + // as the README examples do. + s.metrics.Flush() +} + // Flattens key to Circonus metric name func (s *CirconusSink) flattenKey(parts []string) string { joined := strings.Join(parts, "`") diff --git a/circonus/circonus_test.go b/circonus/circonus_test.go index 2644d57..77e9f57 100644 --- a/circonus/circonus_test.go +++ b/circonus/circonus_test.go @@ -8,6 +8,8 @@ import ( "net/http/httptest" "strings" "testing" + + "github.com/armon/go-metrics" ) func TestNewCirconusSink(t *testing.T) { @@ -152,3 +154,8 @@ func TestAddSample(t *testing.T) { } } + +func TestMetricSinkInterface(t *testing.T) { + var cs *CirconusSink + _ = metrics.MetricSink(cs) +} diff --git a/const_unix.go b/const_unix.go index 31098dd..511202d 100644 --- a/const_unix.go +++ b/const_unix.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package metrics diff --git a/const_windows.go b/const_windows.go index 38136af..6bb1897 100644 --- a/const_windows.go +++ b/const_windows.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package metrics diff --git a/datadog/dogstatsd.go b/datadog/dogstatsd.go index fe021d0..637ddf8 100644 --- a/datadog/dogstatsd.go +++ b/datadog/dogstatsd.go @@ -120,6 +120,10 @@ func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels [] s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate) } +func (s *DogStatsdSink) Shutdown() { + s.client.Close() +} + func (s *DogStatsdSink) getFlatkeyAndCombinedLabels(key []string, labels []metrics.Label) (string, []string) { key, parsedLabels := s.parseKey(key) flatKey := s.flattenKey(key) diff --git a/datadog/dogstatsd_test.go b/datadog/dogstatsd_test.go index cd3f833..3545e8d 100644 --- a/datadog/dogstatsd_test.go +++ b/datadog/dogstatsd_test.go @@ -151,3 +151,8 @@ func assertServerMatchesExpected(t *testing.T, server *net.UDPConn, buf []byte, t.Fatalf("Line %s does not match expected: %s", string(msg), expected) } } + +func TestMetricSinkInterface(t *testing.T) { + var dd *DogStatsdSink + _ = metrics.MetricSink(dd) +} diff --git a/inmem.go b/inmem.go index 7c427ac..a52d0ff 100644 --- a/inmem.go +++ b/inmem.go @@ -230,6 +230,10 @@ func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Labe agg.Ingest(float64(val), i.rateDenom) } +func (i *InmemSink) Shutdown() { + // Do nothing. InmemSink does not have cleanup associated with shutdown. +} + // Data is used to retrieve all the aggregated metrics // Intervals may be in use, and a read lock should be acquired func (i *InmemSink) Data() []*IntervalMetrics { diff --git a/metrics.go b/metrics.go index 6753b13..047657b 100644 --- a/metrics.go +++ b/metrics.go @@ -172,6 +172,10 @@ func (m *Metrics) UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabe } } +func (m *Metrics) Shutdown() { + m.sink.Shutdown() +} + // labelIsAllowed return true if a should be included in metric // the caller should lock m.filterLock while calling this method func (m *Metrics) labelIsAllowed(label *Label) bool { diff --git a/prometheus/prometheus.go b/prometheus/prometheus.go index 5a8282f..9789e20 100644 --- a/prometheus/prometheus.go +++ b/prometheus/prometheus.go @@ -1,3 +1,4 @@ +//go:build go1.9 // +build go1.9 package prometheus @@ -20,7 +21,7 @@ var ( // PrometheusSink. DefaultPrometheusOpts = PrometheusOpts{ Expiration: 60 * time.Second, - Name: "default_prometheus_sink", + Name: "default_prometheus_sink", } ) @@ -393,6 +394,10 @@ func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labe } } +// Shutdown is not implemented. PrometheusSink is in memory storage. +func (p *PrometheusSink) Shutdown() { +} + // PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address // on an interval. type PrometheusPushSink struct { diff --git a/prometheus/prometheus_test.go b/prometheus/prometheus_test.go index 6190874..321a7b9 100644 --- a/prometheus/prometheus_test.go +++ b/prometheus/prometheus_test.go @@ -54,6 +54,7 @@ func TestNewPrometheusSink(t *testing.T) { t.Fatalf("Unregister(sink) = false, want true") } } + // TestMultiplePrometheusSink tests registering multiple sinks on the same registerer with different descriptors func TestMultiplePrometheusSink(t *testing.T) { gaugeDef := GaugeDefinition{ @@ -66,14 +67,14 @@ func TestMultiplePrometheusSink(t *testing.T) { GaugeDefinitions: append([]GaugeDefinition{}, gaugeDef), SummaryDefinitions: append([]SummaryDefinition{}), CounterDefinitions: append([]CounterDefinition{}), - Name: "sink1", + Name: "sink1", } sink1, err := NewPrometheusSinkFrom(cfg) if err != nil { t.Fatalf("err = %v, want nil", err) } - + reg := prometheus.DefaultRegisterer if reg == nil { t.Fatalf("Expected default register to be non nil, got nil.") @@ -359,3 +360,10 @@ func TestDefinitionsWithLabels(t *testing.T) { return true }) } + +func TestMetricSinkInterface(t *testing.T) { + var ps *PrometheusSink + _ = metrics.MetricSink(ps) + var pps *PrometheusPushSink + _ = metrics.MetricSink(pps) +} diff --git a/sink.go b/sink.go index 0b7d6e4..4e6a8df 100644 --- a/sink.go +++ b/sink.go @@ -22,6 +22,9 @@ type MetricSink interface { // Samples are for timing information, where quantiles are used AddSample(key []string, val float32) AddSampleWithLabels(key []string, val float32, labels []Label) + + // Shutdown the sink, flushing data, and performing cleanup as necessary. + Shutdown() } // BlackholeSink is used to just blackhole messages @@ -34,6 +37,7 @@ func (*BlackholeSink) IncrCounter(key []string, val float32) func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {} func (*BlackholeSink) AddSample(key []string, val float32) {} func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {} +func (*BlackholeSink) Shutdown() {} // FanoutSink is used to sink to fanout values to multiple sinks type FanoutSink []MetricSink @@ -74,6 +78,12 @@ func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Lab } } +func (fh FanoutSink) Shutdown() { + for _, s := range fh { + s.Shutdown() + } +} + // sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided // by each sink type type sinkURLFactoryFunc func(*url.URL) (MetricSink, error) diff --git a/sink_test.go b/sink_test.go index 36da370..e10f678 100644 --- a/sink_test.go +++ b/sink_test.go @@ -63,6 +63,7 @@ func (m *MockSink) AddSampleWithLabels(key []string, val float32, labels []Label m.vals = append(m.vals, val) m.labels = append(m.labels, labels) } +func (m *MockSink) Shutdown() {} func TestFanoutSink_Gauge(t *testing.T) { m1 := &MockSink{} diff --git a/start.go b/start.go index 6aa0bd3..7747d0b 100644 --- a/start.go +++ b/start.go @@ -144,3 +144,13 @@ func UpdateFilter(allow, block []string) { func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) { globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels) } + +// Shutdown flushes and disables metric collection, blocking while waiting for this to complete. +// WARNING: Not all MetricSink backends support this functionality, and calling this will cause resource leaks. +// This is intended for use immediately prior to application exit. +func Shutdown() { + m := globalMetrics.Load().(*Metrics) + // Replace global metrics with the BlackholeSink like how init setup the library. + globalMetrics.Store(&Metrics{sink: &BlackholeSink{}}) + m.Shutdown() +} From 3e16282fa96d3e841cf0ada358cc442f0fd3c962 Mon Sep 17 00:00:00 2001 From: Gianni Gambetti <99784476+ggambetti@users.noreply.github.com> Date: Thu, 14 Apr 2022 13:42:02 -0400 Subject: [PATCH 2/3] gofmt --- start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/start.go b/start.go index 7747d0b..4576f5a 100644 --- a/start.go +++ b/start.go @@ -150,7 +150,7 @@ func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) // This is intended for use immediately prior to application exit. func Shutdown() { m := globalMetrics.Load().(*Metrics) - // Replace global metrics with the BlackholeSink like how init setup the library. + // Replace global metrics with the BlackholeSink like how init setup the library. globalMetrics.Store(&Metrics{sink: &BlackholeSink{}}) m.Shutdown() } From 4fbf9d3c39e6e39664f975b7d9bb4f0cfb3628a6 Mon Sep 17 00:00:00 2001 From: Gianni Gambetti <99784476+ggambetti@users.noreply.github.com> Date: Tue, 19 Apr 2022 18:22:29 -0400 Subject: [PATCH 3/3] Updated documentation and adds tests --- circonus/circonus.go | 6 ++++-- const_unix.go | 1 - const_windows.go | 1 - datadog/dogstatsd.go | 1 + prometheus/prometheus.go | 4 ++++ sink.go | 4 +++- sink_test.go | 14 ++++++++++---- start.go | 8 +++++--- start_test.go | 20 ++++++++++++++++++++ 9 files changed, 47 insertions(+), 12 deletions(-) diff --git a/circonus/circonus.go b/circonus/circonus.go index 2892d24..9f1ea18 100644 --- a/circonus/circonus.go +++ b/circonus/circonus.go @@ -97,9 +97,11 @@ func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []m s.metrics.RecordValue(flatKey, float64(val)) } +// Shutdown blocks while flushing metrics to the backend. func (s *CirconusSink) Shutdown() { - // The used version of the circonus metrics library does not support a shutdown operation. - // Instead we call Flush which blocks until metrics are submitted to storage, and then exit + // The version of circonus metrics in go.mod (v2.3.1), and the current + // version (v3.4.6) do not support a shutdown operation. Instead we call + // Flush which blocks until metrics are submitted to storage, and then exit // as the README examples do. s.metrics.Flush() } diff --git a/const_unix.go b/const_unix.go index 511202d..31098dd 100644 --- a/const_unix.go +++ b/const_unix.go @@ -1,4 +1,3 @@ -//go:build !windows // +build !windows package metrics diff --git a/const_windows.go b/const_windows.go index 6bb1897..38136af 100644 --- a/const_windows.go +++ b/const_windows.go @@ -1,4 +1,3 @@ -//go:build windows // +build windows package metrics diff --git a/datadog/dogstatsd.go b/datadog/dogstatsd.go index 637ddf8..c980004 100644 --- a/datadog/dogstatsd.go +++ b/datadog/dogstatsd.go @@ -120,6 +120,7 @@ func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels [] s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate) } +// Shutdown disables further metric collection, blocks to flush data, and tears down the sink. func (s *DogStatsdSink) Shutdown() { s.client.Close() } diff --git a/prometheus/prometheus.go b/prometheus/prometheus.go index 9789e20..f89cfd9 100644 --- a/prometheus/prometheus.go +++ b/prometheus/prometheus.go @@ -451,6 +451,10 @@ func (s *PrometheusPushSink) flushMetrics() { }() } +// Shutdown tears down the PrometheusPushSink, and blocks while flushing metrics to the backend. func (s *PrometheusPushSink) Shutdown() { close(s.stopChan) + // Closing the channel only stops the running goroutine that pushes metrics. + // To minimize the chance of data loss pusher.Push is called one last time. + s.pusher.Push() } diff --git a/sink.go b/sink.go index 4e6a8df..b839844 100644 --- a/sink.go +++ b/sink.go @@ -23,7 +23,9 @@ type MetricSink interface { AddSample(key []string, val float32) AddSampleWithLabels(key []string, val float32, labels []Label) - // Shutdown the sink, flushing data, and performing cleanup as necessary. + // Shutdown the metric sink, flush metrics to storage, and cleanup resources. + // Called immediately prior to application exit. Implementations must block + // until metrics are flushed to storage. Shutdown() } diff --git a/sink_test.go b/sink_test.go index e10f678..dbaab7d 100644 --- a/sink_test.go +++ b/sink_test.go @@ -10,9 +10,10 @@ import ( type MockSink struct { lock sync.Mutex - keys [][]string - vals []float32 - labels [][]Label + shutdown bool + keys [][]string + vals []float32 + labels [][]Label } func (m *MockSink) getKeys() [][]string { @@ -63,7 +64,12 @@ func (m *MockSink) AddSampleWithLabels(key []string, val float32, labels []Label m.vals = append(m.vals, val) m.labels = append(m.labels, labels) } -func (m *MockSink) Shutdown() {} +func (m *MockSink) Shutdown() { + m.lock.Lock() + defer m.lock.Unlock() + + m.shutdown = true +} func TestFanoutSink_Gauge(t *testing.T) { m1 := &MockSink{} diff --git a/start.go b/start.go index 4576f5a..38976f8 100644 --- a/start.go +++ b/start.go @@ -145,12 +145,14 @@ func UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels []string) globalMetrics.Load().(*Metrics).UpdateFilterAndLabels(allow, block, allowedLabels, blockedLabels) } -// Shutdown flushes and disables metric collection, blocking while waiting for this to complete. -// WARNING: Not all MetricSink backends support this functionality, and calling this will cause resource leaks. +// Shutdown disables metric collection, then blocks while attempting to flush metrics to storage. +// WARNING: Not all MetricSink backends support this functionality, and calling this will cause them to leak resources. // This is intended for use immediately prior to application exit. func Shutdown() { m := globalMetrics.Load().(*Metrics) - // Replace global metrics with the BlackholeSink like how init setup the library. + // Swap whatever MetricSink is currently active with a BlackholeSink. Callers must not have a + // reason to expect that calls to the library will successfully collect metrics after Shutdown + // has been called. globalMetrics.Store(&Metrics{sink: &BlackholeSink{}}) m.Shutdown() } diff --git a/start_test.go b/start_test.go index 8ff5ca0..38a7b26 100644 --- a/start_test.go +++ b/start_test.go @@ -192,6 +192,26 @@ func Test_GlobalMetrics_UpdateFilter(t *testing.T) { } } +func Test_GlobalMetrics_Shutdown(t *testing.T) { + s := &MockSink{} + m := &Metrics{sink: s} + globalMetrics.Store(m) + + Shutdown() + + loaded := globalMetrics.Load() + metrics, ok := loaded.(*Metrics) + if !ok { + t.Fatalf("Expected globalMetrics to contain a Metrics pointer, but found: %v", loaded) + } + if metrics == m { + t.Errorf("Calling shutdown should have replaced the Metrics struct stored in globalMetrics") + } + if !s.shutdown { + t.Errorf("Expected Shutdown to have been called on MockSink") + } +} + // Benchmark_GlobalMetrics_Direct/direct-8 5000000 278 ns/op // Benchmark_GlobalMetrics_Direct/atomic.Value-8 5000000 235 ns/op func Benchmark_GlobalMetrics_Direct(b *testing.B) {