From 0378df550b9de1ec0e532856bf8632da7987bf7d Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 15 Jan 2016 16:23:04 -0700 Subject: [PATCH] fix panic on nil value in top/bottom mr funcs --- CHANGELOG.md | 1 + etc/kapacitor/kapacitor.conf | 2 +- integrations/batcher_test.go | 2 +- integrations/streamer_test.go | 14 +++++++------- map_reduce.go | 10 +++++++++- 5 files changed, 19 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 49a1a7920..55008f2f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ This way you can easily tell which versions of Telegraf, InfluxDB, Chronograf an - [#139](https://github.com/influxdata/kapacitor/issues/139): Alerta.io support thanks! @md14454 ### Bugfixes +- [#153](https://github.com/influxdata/kapacitor/issues/153): Fix panic if referencing non existant field in MapReduce function. ## v0.2.4 [2016-01-07] diff --git a/etc/kapacitor/kapacitor.conf b/etc/kapacitor/kapacitor.conf index 5a76c3846..028ed6118 100644 --- a/etc/kapacitor/kapacitor.conf +++ b/etc/kapacitor/kapacitor.conf @@ -48,7 +48,7 @@ data_dir = "/var/lib/kapacitor" # Id -- the alert Id, NODE_NAME will be replaced with the name of the node being monitored. id = "node 'NODE_NAME' in task '{{ .TaskName }}'" # The message of the alert. INTERVAL will be replaced by the interval. - message = "{{ .ID }} is dead: {{ index .Fields \"collected\" }} points/INTERVAL" + message = "{{ .ID }} is {{ if eq .Level \"OK\" }}alive{{ else }}dead{{ end }}: {{ index .Fields \"collected\" | printf \"%0.3f\" }} points/INTERVAL." [influxdb] diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index a7a3638bf..f9245b0d1 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -209,7 +209,7 @@ batch .period(10s) .every(10s) .groupBy(time(2s), 'cpu') - .mapReduce(influxql.count('value')) + .mapReduce(influxql.count('mean')) .window() .period(20s) .every(20s) diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index fa55111b7..cae9a9a9a 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -1294,7 +1294,7 @@ stream .window() .period(10s) .every(10s) - .mapReduce(influxql.count('idle')) + .mapReduce(influxql.count('value')) .alert() .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') .info(lambda: "count" > infoThreshold) @@ -1370,7 +1370,7 @@ stream .window() .period(10s) .every(10s) - .mapReduce(influxql.count('idle')) + .mapReduce(influxql.count('value')) .alert() .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') .info(lambda: "count" > 6.0) @@ -1447,7 +1447,7 @@ stream .window() .period(10s) .every(10s) - .mapReduce(influxql.count('idle')) + .mapReduce(influxql.count('value')) .alert() .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') .info(lambda: "count" > 6.0) @@ -1537,7 +1537,7 @@ stream .window() .period(10s) .every(10s) - .mapReduce(influxql.count('idle')) + .mapReduce(influxql.count('value')) .alert() .id('{{ index .Tags "host" }}') .message('kapacitor/{{ .Name }}/{{ index .Tags "host" }} is {{ .Level }}') @@ -1662,7 +1662,7 @@ stream .window() .period(10s) .every(10s) - .mapReduce(influxql.count('idle')) + .mapReduce(influxql.count('value')) .alert() .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') .info(lambda: "count" > 6.0) @@ -1738,7 +1738,7 @@ stream .window() .period(10s) .every(10s) - .mapReduce(influxql.count('idle')) + .mapReduce(influxql.count('value')) .alert() .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') .message('{{ .Level }} alert for {{ .ID }}') @@ -1821,7 +1821,7 @@ stream .window() .period(10s) .every(10s) - .mapReduce(influxql.count('idle')) + .mapReduce(influxql.count('value')) .alert() .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') .info(lambda: "count" > 6.0) diff --git a/map_reduce.go b/map_reduce.go index 7396726d9..a3271a5cb 100644 --- a/map_reduce.go +++ b/map_reduce.go @@ -116,9 +116,17 @@ func (m *MapNode) mapBatch(b models.Batch) error { inputs := make([]tsdb.MapInput, m.parallel) j := 0 for _, p := range b.Points { + value, ok := p.Fields[m.field] + if !ok { + fields := make([]string, 0, len(p.Fields)) + for field := range p.Fields { + fields = append(fields, field) + } + return fmt.Errorf("unknown field %s, available fields %v", m.field, fields) + } item := tsdb.MapItem{ Timestamp: p.Time.Unix(), - Value: p.Fields[m.field], + Value: value, Fields: p.Fields, Tags: p.Tags, }