Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable basicstats #3580

Merged
merged 3 commits into from
Dec 15, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions plugins/aggregators/basicstats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,26 @@ emitting the aggregate every `period` seconds.
```toml
# Keep the aggregate basicstats of each metric passing through.
[[aggregators.basicstats]]

## General Aggregator Arguments:

## The period on which to flush & clear the aggregator.
period = "30s"

## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false

## BasicStats Arguments:

## Configures which basic stats to push as fields
stats = ["count","min","max","mean","stdev","s2"]
```

- stats
- If not specified, all stats are aggregated and pushed as fields
- If empty array, no stats are aggregated

### Measurements & Fields:

- measurement1
Expand Down
109 changes: 100 additions & 9 deletions plugins/aggregators/basicstats/basicstats.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
package basicstats

import (
"log"
"math"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
)

type BasicStats struct {
cache map[uint64]aggregate
Stats []string `toml:"stats"`

cache map[uint64]aggregate
statsConfig *configuredStats
}

type configuredStats struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also do this as flags, but not sure it's a win

count bool
min bool
max bool
mean bool
variance bool
stdev bool
}

func NewBasicStats() telegraf.Aggregator {
func NewBasicStats() *BasicStats {
mm := &BasicStats{}
mm.Reset()
return mm
Expand Down Expand Up @@ -114,23 +127,101 @@ func (m *BasicStats) Add(in telegraf.Metric) {
}

func (m *BasicStats) Push(acc telegraf.Accumulator) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first pass, only using the configuration to control which stats are pushed. I'm not trying to optimize the computation of each stat as they are added.

The balance between checking the configuration per calculation seems like a micro optimization we can make later.


config := getConfiguredStats(m)

for _, aggregate := range m.cache {
fields := map[string]interface{}{}
for k, v := range aggregate.fields {
fields[k+"_count"] = v.count
fields[k+"_min"] = v.min
fields[k+"_max"] = v.max
fields[k+"_mean"] = v.mean

if config.count {
fields[k+"_count"] = v.count
}
if config.min {
fields[k+"_min"] = v.min
}
if config.max {
fields[k+"_max"] = v.max
}
if config.mean {
fields[k+"_mean"] = v.mean
}

//v.count always >=1
if v.count > 1 {
variance := v.M2 / (v.count - 1)
fields[k+"_s2"] = variance
fields[k+"_stdev"] = math.Sqrt(variance)

if config.variance {
fields[k+"_s2"] = variance
}
if config.stdev {
fields[k+"_stdev"] = math.Sqrt(variance)
}
}
//if count == 1 StdDev = infinite => so I won't send data
}
acc.AddFields(aggregate.name, fields, aggregate.tags)

if len(fields) > 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an empty array is specified in the configuration, then we push nothing. I plan to use this with the field specific configuration, in other words, only produce aggregations for known fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can determine an empty array from a unset array with our current toml parsing, so you should always have at least one field.

Remember you can use the regular measurement filtering options to control what fields are added in the first place.

Copy link
Contributor Author

@JeffAshton JeffAshton Dec 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. I still feel like this check isn't a bad thing. It would still happen if I passed an invalid stat, for example:

[[aggregators.basicstats]]
  period = "5s"
  drop_original = true
  stats = [ "-" ]

Which would produce this case. So in the interest of being robust, I think the check is worth it.

acc.AddFields(aggregate.name, fields, aggregate.tags)
}
}
}

func parseStats(names []string) *configuredStats {

parsed := &configuredStats{}

for _, name := range names {

switch name {

case "count":
parsed.count = true
case "min":
parsed.min = true
case "max":
parsed.max = true
case "mean":
parsed.mean = true
case "s2":
parsed.variance = true
case "stdev":
parsed.stdev = true

default:
log.Printf("W! Unrecognized basic stat '[%s]', ignoring", name)
}
}

return parsed
}

func defaultStats() *configuredStats {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly for backwards compatibility. If not specified in the config, you get the old default of everything.


defaults := &configuredStats{}

defaults.count = true
defaults.min = true
defaults.max = true
defaults.mean = true
defaults.variance = true
defaults.stdev = true

return defaults
}

func getConfiguredStats(m *BasicStats) *configuredStats {

if m.statsConfig == nil {

if m.Stats == nil {
m.statsConfig = defaultStats()
} else {
m.statsConfig = parseStats(m.Stats)
}
}

return m.statsConfig
}

func (m *BasicStats) Reset() {
Expand Down
208 changes: 208 additions & 0 deletions plugins/aggregators/basicstats/basicstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,211 @@ func TestBasicStatsDifferentPeriods(t *testing.T) {
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating count
func TestBasicStatsWithOnlyCount(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"count"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_count": float64(2),
"b_count": float64(2),
"c_count": float64(2),
"d_count": float64(2),
"e_count": float64(1),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating minimum
func TestBasicStatsWithOnlyMin(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"min"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_min": float64(1),
"b_min": float64(1),
"c_min": float64(2),
"d_min": float64(2),
"e_min": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating maximum
func TestBasicStatsWithOnlyMax(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"max"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_max": float64(1),
"b_max": float64(3),
"c_max": float64(4),
"d_max": float64(6),
"e_max": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating mean
func TestBasicStatsWithOnlyMean(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"mean"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_mean": float64(1),
"b_mean": float64(2),
"c_mean": float64(3),
"d_mean": float64(4),
"e_mean": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating variance
func TestBasicStatsWithOnlyVariance(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"s2"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_s2": float64(0),
"b_s2": float64(2),
"c_s2": float64(2),
"d_s2": float64(8),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating standard deviation
func TestBasicStatsWithOnlyStandardDeviation(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"stdev"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_stdev": float64(0),
"b_stdev": math.Sqrt(2),
"c_stdev": math.Sqrt(2),
"d_stdev": math.Sqrt(8),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating minimum and maximum
func TestBasicStatsWithMinAndMax(t *testing.T) {
Copy link
Contributor Author

@JeffAshton JeffAshton Dec 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test for 2 stats


aggregator := NewBasicStats()
aggregator.Stats = []string{"min", "max"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_max": float64(1), //a
"a_min": float64(1),
"b_max": float64(3), //b
"b_min": float64(1),
"c_max": float64(4), //c
"c_min": float64(2),
"d_max": float64(6), //d
"d_min": float64(2),
"e_max": float64(200), //e
"e_min": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test that if an empty array is passed, no points are pushed
func TestBasicStatsWithNoStats(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

acc.AssertDoesNotContainMeasurement(t, "m1")
}

// Test that if an unknown stat is configured, it doesn't explode
func TestBasicStatsWithUnknownStat(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"crazy"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

acc.AssertDoesNotContainMeasurement(t, "m1")
}