-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Configurable basicstats #3580
Configurable basicstats #3580
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,30 @@ | ||
package basicstats | ||
|
||
import ( | ||
"log" | ||
"math" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/plugins/aggregators" | ||
) | ||
|
||
type BasicStats struct { | ||
cache map[uint64]aggregate | ||
Stats []string `toml:"stats"` | ||
|
||
cache map[uint64]aggregate | ||
statsConfig *configuredStats | ||
} | ||
|
||
type configuredStats struct { | ||
count bool | ||
min bool | ||
max bool | ||
mean bool | ||
variance bool | ||
stdev bool | ||
} | ||
|
||
func NewBasicStats() telegraf.Aggregator { | ||
func NewBasicStats() *BasicStats { | ||
mm := &BasicStats{} | ||
mm.Reset() | ||
return mm | ||
|
@@ -114,23 +127,101 @@ func (m *BasicStats) Add(in telegraf.Metric) { | |
} | ||
|
||
func (m *BasicStats) Push(acc telegraf.Accumulator) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the first pass, only using the configuration to control which stats are pushed. I'm not trying to optimize the computation of each stat as they are added. The balance between checking the configuration per calculation seems like a micro optimization we can make later. |
||
|
||
config := getConfiguredStats(m) | ||
|
||
for _, aggregate := range m.cache { | ||
fields := map[string]interface{}{} | ||
for k, v := range aggregate.fields { | ||
fields[k+"_count"] = v.count | ||
fields[k+"_min"] = v.min | ||
fields[k+"_max"] = v.max | ||
fields[k+"_mean"] = v.mean | ||
|
||
if config.count { | ||
fields[k+"_count"] = v.count | ||
} | ||
if config.min { | ||
fields[k+"_min"] = v.min | ||
} | ||
if config.max { | ||
fields[k+"_max"] = v.max | ||
} | ||
if config.mean { | ||
fields[k+"_mean"] = v.mean | ||
} | ||
|
||
//v.count always >=1 | ||
if v.count > 1 { | ||
variance := v.M2 / (v.count - 1) | ||
fields[k+"_s2"] = variance | ||
fields[k+"_stdev"] = math.Sqrt(variance) | ||
|
||
if config.variance { | ||
fields[k+"_s2"] = variance | ||
} | ||
if config.stdev { | ||
fields[k+"_stdev"] = math.Sqrt(variance) | ||
} | ||
} | ||
//if count == 1 StdDev = infinite => so I won't send data | ||
} | ||
acc.AddFields(aggregate.name, fields, aggregate.tags) | ||
|
||
if len(fields) > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If an empty array is specified in the configuration, then we push nothing. I plan to use this with the field specific configuration, in other words, only produce aggregations for known fields. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you can determine an empty array from a unset array with our current toml parsing, so you should always have at least one field. Remember you can use the regular measurement filtering options to control what fields are added in the first place. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are correct. I still feel like this check isn't a bad thing. It would still happen if I passed an invalid stat, for example:
Which would produce this case. So in the interest of being robust, I think the check is worth it. |
||
acc.AddFields(aggregate.name, fields, aggregate.tags) | ||
} | ||
} | ||
} | ||
|
||
func parseStats(names []string) *configuredStats { | ||
|
||
parsed := &configuredStats{} | ||
|
||
for _, name := range names { | ||
|
||
switch name { | ||
|
||
case "count": | ||
parsed.count = true | ||
case "min": | ||
parsed.min = true | ||
case "max": | ||
parsed.max = true | ||
case "mean": | ||
parsed.mean = true | ||
case "s2": | ||
parsed.variance = true | ||
case "stdev": | ||
parsed.stdev = true | ||
|
||
default: | ||
log.Printf("W! Unrecognized basic stat '%s', ignoring", name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sample output:
|
||
} | ||
} | ||
|
||
return parsed | ||
} | ||
|
||
func defaultStats() *configuredStats { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mostly for backwards compatibility. If not specified in the config, you get the old default of everything. |
||
|
||
defaults := &configuredStats{} | ||
|
||
defaults.count = true | ||
defaults.min = true | ||
defaults.max = true | ||
defaults.mean = true | ||
defaults.variance = true | ||
defaults.stdev = true | ||
|
||
return defaults | ||
} | ||
|
||
func getConfiguredStats(m *BasicStats) *configuredStats { | ||
|
||
if m.statsConfig == nil { | ||
|
||
if m.Stats == nil { | ||
m.statsConfig = defaultStats() | ||
} else { | ||
m.statsConfig = parseStats(m.Stats) | ||
} | ||
} | ||
|
||
return m.statsConfig | ||
} | ||
|
||
func (m *BasicStats) Reset() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,3 +149,211 @@ func TestBasicStatsDifferentPeriods(t *testing.T) { | |
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test only aggregating count | ||
func TestBasicStatsWithOnlyCount(t *testing.T) { | ||
|
||
aggregator := NewBasicStats() | ||
aggregator.Stats = []string{"count"} | ||
|
||
aggregator.Add(m1) | ||
aggregator.Add(m2) | ||
|
||
acc := testutil.Accumulator{} | ||
aggregator.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"a_count": float64(2), | ||
"b_count": float64(2), | ||
"c_count": float64(2), | ||
"d_count": float64(2), | ||
"e_count": float64(1), | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test only aggregating minimum | ||
func TestBasicStatsWithOnlyMin(t *testing.T) { | ||
|
||
aggregator := NewBasicStats() | ||
aggregator.Stats = []string{"min"} | ||
|
||
aggregator.Add(m1) | ||
aggregator.Add(m2) | ||
|
||
acc := testutil.Accumulator{} | ||
aggregator.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"a_min": float64(1), | ||
"b_min": float64(1), | ||
"c_min": float64(2), | ||
"d_min": float64(2), | ||
"e_min": float64(200), | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test only aggregating maximum | ||
func TestBasicStatsWithOnlyMax(t *testing.T) { | ||
|
||
aggregator := NewBasicStats() | ||
aggregator.Stats = []string{"max"} | ||
|
||
aggregator.Add(m1) | ||
aggregator.Add(m2) | ||
|
||
acc := testutil.Accumulator{} | ||
aggregator.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"a_max": float64(1), | ||
"b_max": float64(3), | ||
"c_max": float64(4), | ||
"d_max": float64(6), | ||
"e_max": float64(200), | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test only aggregating mean | ||
func TestBasicStatsWithOnlyMean(t *testing.T) { | ||
|
||
aggregator := NewBasicStats() | ||
aggregator.Stats = []string{"mean"} | ||
|
||
aggregator.Add(m1) | ||
aggregator.Add(m2) | ||
|
||
acc := testutil.Accumulator{} | ||
aggregator.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"a_mean": float64(1), | ||
"b_mean": float64(2), | ||
"c_mean": float64(3), | ||
"d_mean": float64(4), | ||
"e_mean": float64(200), | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test only aggregating variance | ||
func TestBasicStatsWithOnlyVariance(t *testing.T) { | ||
|
||
aggregator := NewBasicStats() | ||
aggregator.Stats = []string{"s2"} | ||
|
||
aggregator.Add(m1) | ||
aggregator.Add(m2) | ||
|
||
acc := testutil.Accumulator{} | ||
aggregator.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"a_s2": float64(0), | ||
"b_s2": float64(2), | ||
"c_s2": float64(2), | ||
"d_s2": float64(8), | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test only aggregating standard deviation | ||
func TestBasicStatsWithOnlyStandardDeviation(t *testing.T) { | ||
|
||
aggregator := NewBasicStats() | ||
aggregator.Stats = []string{"stdev"} | ||
|
||
aggregator.Add(m1) | ||
aggregator.Add(m2) | ||
|
||
acc := testutil.Accumulator{} | ||
aggregator.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"a_stdev": float64(0), | ||
"b_stdev": math.Sqrt(2), | ||
"c_stdev": math.Sqrt(2), | ||
"d_stdev": math.Sqrt(8), | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test only aggregating minimum and maximum | ||
func TestBasicStatsWithMinAndMax(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test for 2 stats |
||
|
||
aggregator := NewBasicStats() | ||
aggregator.Stats = []string{"min", "max"} | ||
|
||
aggregator.Add(m1) | ||
aggregator.Add(m2) | ||
|
||
acc := testutil.Accumulator{} | ||
aggregator.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"a_max": float64(1), //a | ||
"a_min": float64(1), | ||
"b_max": float64(3), //b | ||
"b_min": float64(1), | ||
"c_max": float64(4), //c | ||
"c_min": float64(2), | ||
"d_max": float64(6), //d | ||
"d_min": float64(2), | ||
"e_max": float64(200), //e | ||
"e_min": float64(200), | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test that if an empty array is passed, no points are pushed | ||
func TestBasicStatsWithNoStats(t *testing.T) { | ||
|
||
aggregator := NewBasicStats() | ||
aggregator.Stats = []string{} | ||
|
||
aggregator.Add(m1) | ||
aggregator.Add(m2) | ||
|
||
acc := testutil.Accumulator{} | ||
aggregator.Push(&acc) | ||
|
||
acc.AssertDoesNotContainMeasurement(t, "m1") | ||
} | ||
|
||
// Test that if an unknown stat is configured, it doesn't explode | ||
func TestBasicStatsWithUnknownStat(t *testing.T) { | ||
|
||
aggregator := NewBasicStats() | ||
aggregator.Stats = []string{"crazy"} | ||
|
||
aggregator.Add(m1) | ||
aggregator.Add(m2) | ||
|
||
acc := testutil.Accumulator{} | ||
aggregator.Push(&acc) | ||
|
||
acc.AssertDoesNotContainMeasurement(t, "m1") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could also do this as flags, but not sure it's a win