diff --git a/CHANGELOG.md b/CHANGELOG.md index 9842ff008..7473af7c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,32 @@ ### Release Notes +#### Alert reset expressions + +Kapacitor now supports alert reset expressions. +This helps keep an alert state from being lowered in severity until its reset expression evaluated successfully. + +Example: + +```go +stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + |alert() + .info(lambda: "value" > 60) + .infoReset(lambda: "value" < 10) + .warn(lambda: "value" > 70) + .warnReset(lambda: "value" < 20) + .crit(lambda: "value" > 80) + .critReset(lambda: "value" < 30) +``` + ### Features +- [#740](https://github.com/influxdata/kapacitor/pull/740): Support reset expressions to prevent an alert from being lowered in severity + ### Bugfixes - [#783](https://github.com/influxdata/kapacitor/pull/783): Fix panic when revoking tokens not already defined. @@ -96,7 +120,7 @@ stream - [#662](https://github.com/influxdata/kapacitor/pull/662): Add `-skipVerify` flag to `kapacitor` CLI tool to skip SSL verification. - [#680](https://github.com/influxdata/kapacitor/pull/680): Add Telegram Alerting option - [#46](https://github.com/influxdata/kapacitor/issues/46): Can now create combinations of points within the same stream. - This is kind of like join but instead joining a stream with itself. + This is kind of like join but instead joining a stream with itself. - [#669](https://github.com/influxdata/kapacitor/pull/669): Add size function for humanize byte size. thanks @jsvisa! - [#697](https://github.com/influxdata/kapacitor/pull/697): Can now flatten a set of points into a single points creating dynamcially named fields. - [#698](https://github.com/influxdata/kapacitor/pull/698): Join delimiter can be specified. @@ -110,7 +134,7 @@ stream - [#627](https://github.com/influxdata/kapacitor/issues/627): Fix where InfluxQL functions that returned a batch could drop tags. - [#674](https://github.com/influxdata/kapacitor/issues/674): Fix panic with Join On and batches. - [#665](https://github.com/influxdata/kapacitor/issues/665): BREAKING: Fix file mode not being correct for Alert.Log files. - Breaking change is that integers numbers prefixed with a 0 in TICKscript are interpreted as octal numbers. + Breaking change is that integers numbers prefixed with a 0 in TICKscript are interpreted as octal numbers. - [#667](https://github.com/influxdata/kapacitor/issues/667): Align deadman timestamps to interval. ## v1.0.0-beta2 [2016-06-17] diff --git a/alert.go b/alert.go index 876dc9e81..3472dbe61 100644 --- a/alert.go +++ b/alert.go @@ -116,6 +116,9 @@ type AlertNode struct { critsTriggered *expvar.Int bufPool sync.Pool + + levelResets []stateful.Expression + lrScopePools []stateful.ScopePool } // Create a new AlertNode which caches the most recent item and exposes it over the HTTP API. @@ -310,6 +313,9 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * an.levels = make([]stateful.Expression, CritAlert+1) an.scopePools = make([]stateful.ScopePool, CritAlert+1) + an.levelResets = make([]stateful.Expression, CritAlert+1) + an.lrScopePools = make([]stateful.ScopePool, CritAlert+1) + if n.Info != nil { statefulExpression, expressionCompileError := stateful.NewExpression(n.Info.Expression) if expressionCompileError != nil { @@ -318,6 +324,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * an.levels[InfoAlert] = statefulExpression an.scopePools[InfoAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Info.Expression)) + if n.InfoReset != nil { + lstatefulExpression, lexpressionCompileError := stateful.NewExpression(n.InfoReset.Expression) + if lexpressionCompileError != nil { + return nil, fmt.Errorf("Failed to compile stateful expression for infoReset: %s", lexpressionCompileError) + } + an.levelResets[InfoAlert] = lstatefulExpression + an.lrScopePools[InfoAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.InfoReset.Expression)) + } } if n.Warn != nil { @@ -327,6 +341,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * } an.levels[WarnAlert] = statefulExpression an.scopePools[WarnAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Warn.Expression)) + if n.WarnReset != nil { + lstatefulExpression, lexpressionCompileError := stateful.NewExpression(n.WarnReset.Expression) + if lexpressionCompileError != nil { + return nil, fmt.Errorf("Failed to compile stateful expression for warnReset: %s", lexpressionCompileError) + } + an.levelResets[WarnAlert] = lstatefulExpression + an.lrScopePools[WarnAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.WarnReset.Expression)) + } } if n.Crit != nil { @@ -336,6 +358,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * } an.levels[CritAlert] = statefulExpression an.scopePools[CritAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Crit.Expression)) + if n.CritReset != nil { + lstatefulExpression, lexpressionCompileError := stateful.NewExpression(n.CritReset.Expression) + if lexpressionCompileError != nil { + return nil, fmt.Errorf("Failed to compile stateful expression for critReset: %s", lexpressionCompileError) + } + an.levelResets[CritAlert] = lstatefulExpression + an.lrScopePools[CritAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.CritReset.Expression)) + } } // Setup states @@ -374,7 +404,11 @@ func (a *AlertNode) runAlert([]byte) error { case pipeline.StreamEdge: for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() { a.timer.Start() - l := a.determineLevel(p.Time, p.Fields, p.Tags) + var currentLevel AlertLevel + if state, ok := a.states[p.Group]; ok { + currentLevel = state.currentLevel() + } + l := a.determineLevel(p.Time, p.Fields, p.Tags, currentLevel) state := a.updateState(p.Time, l, p.Group) if (a.a.UseFlapping && state.flapping) || (a.a.IsStateChangesOnly && !state.changed && !state.expired) { a.timer.Stop() @@ -442,7 +476,11 @@ func (a *AlertNode) runAlert([]byte) error { var highestPoint *models.BatchPoint for i, p := range b.Points { - l := a.determineLevel(p.Time, p.Fields, p.Tags) + var currentLevel AlertLevel + if state, ok := a.states[b.Group]; ok { + currentLevel = state.currentLevel() + } + l := a.determineLevel(p.Time, p.Fields, p.Tags, currentLevel) if l < lowestLevel { lowestLevel = l } @@ -552,8 +590,28 @@ func (a *AlertNode) handleAlert(ad *AlertData) { } } -func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map[string]string) AlertLevel { - for l := len(a.levels) - 1; l >= 0; l-- { +func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map[string]string, currentLevel AlertLevel) AlertLevel { + if higherLevel, found := a.findFirstMatchLevel(CritAlert, currentLevel-1, now, fields, tags); found { + return higherLevel + } + if rse := a.levelResets[currentLevel]; rse != nil { + if pass, err := EvalPredicate(rse, a.lrScopePools[currentLevel], now, fields, tags); err != nil { + a.logger.Printf("E! error evaluating reset expression for current level %v: %s", currentLevel, err) + } else if !pass { + return currentLevel + } + } + if newLevel, found := a.findFirstMatchLevel(currentLevel, OKAlert, now, fields, tags); found { + return newLevel + } + return OKAlert +} + +func (a *AlertNode) findFirstMatchLevel(start AlertLevel, stop AlertLevel, now time.Time, fields models.Fields, tags map[string]string) (AlertLevel, bool) { + if stop < OKAlert { + stop = OKAlert + } + for l := start; l > stop; l-- { se := a.levels[l] if se == nil { continue @@ -562,10 +620,10 @@ func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map a.logger.Printf("E! error evaluating expression for level %v: %s", AlertLevel(l), err) continue } else if pass { - return AlertLevel(l) + return AlertLevel(l), true } } - return OKAlert + return OKAlert, false } func (a *AlertNode) batchToResult(b models.Batch) influxql.Result { @@ -646,6 +704,11 @@ func (a *alertState) addEvent(level AlertLevel) { a.history[a.idx] = level } +// Return current level of this state +func (a *alertState) currentLevel() AlertLevel { + return a.history[a.idx] +} + // Compute the percentage change in the alert history. func (a *alertState) percentChange() float64 { l := len(a.history) diff --git a/integrations/data/TestStream_Alert_WithReset_0.srpl b/integrations/data/TestStream_Alert_WithReset_0.srpl new file mode 100644 index 000000000..a0fc5379e --- /dev/null +++ b/integrations/data/TestStream_Alert_WithReset_0.srpl @@ -0,0 +1,78 @@ +dbname +rpname +cpu,type=usage,host=serverA value=45.0 0000000001 +dbname +rpname +cpu,type=usage,host=serverB value=97.1 0000000001 +dbname +rpname +disk,type=sda,host=serverB value=39.0 0000000001 +dbname +rpname +cpu,type=usage,host=serverA value=40.0 0000000002 +dbname +rpname +cpu,type=usage,host=serverB value=92.6 0000000002 +dbname +rpname +cpu,type=usage,host=serverA value=30.0 0000000003 +dbname +rpname +cpu,type=usage,host=serverB value=95.6 0000000003 +dbname +rpname +cpu,type=usage,host=serverA value=9.0 0000000004 +dbname +rpname +cpu,type=usage,host=serverB value=93.1 0000000004 +dbname +rpname +cpu,type=usage,host=serverA value=45.0 0000000005 +dbname +rpname +cpu,type=usage,host=serverB value=92.6 0000000005 +dbname +rpname +cpu,type=usage,host=serverA value=61.0 0000000006 +dbname +rpname +cpu,type=usage,host=serverB value=95.8 0000000006 +dbname +rpname +cpu,type=usage,host=serverC value=95.8 0000000006 +dbname +rpname +cpu,type=usage,host=serverA value=30.0 0000000007 +dbname +rpname +cpu,type=usage,host=serverB value=91.3 0000000007 +dbname +rpname +cpu,type=usage,host=serverA value=19.0 0000000008 +dbname +rpname +cpu,type=usage,host=serverB value=72.3 0000000008 +dbname +rpname +cpu,type=usage,host=serverA value=45.0 0000000009 +dbname +rpname +cpu,type=usage,host=serverB value=97.1 0000000009 +dbname +rpname +cpu,type=usage,host=serverA value=61.0 0000000010 +dbname +rpname +cpu,type=usage,host=serverB value=97.1 0000000010 +dbname +rpname +cpu,type=usage,host=serverA value=81.0 0000000011 +dbname +rpname +cpu,type=usage,host=serverB value=97.1 0000000011 +dbname +rpname +cpu,type=usage,host=serverA value=29.0 0000000012 +dbname +rpname +cpu,type=usage,host=serverB value=97.1 0000000012 diff --git a/integrations/data/TestStream_Alert_WithReset_1.srpl b/integrations/data/TestStream_Alert_WithReset_1.srpl new file mode 100644 index 000000000..c355c053c --- /dev/null +++ b/integrations/data/TestStream_Alert_WithReset_1.srpl @@ -0,0 +1,96 @@ +dbname +rpname +cpu,type=usage,host=serverA value=45.0 0000000001 +dbname +rpname +cpu,type=usage,host=serverB value=97.1 0000000001 +dbname +rpname +disk,type=sda,host=serverB value=39.0 0000000001 +dbname +rpname +cpu,type=usage,host=serverA value=40.0 0000000002 +dbname +rpname +cpu,type=usage,host=serverB value=92.6 0000000002 +dbname +rpname +cpu,type=usage,host=serverA value=30.0 0000000003 +dbname +rpname +cpu,type=usage,host=serverB value=95.6 0000000003 +dbname +rpname +cpu,type=usage,host=serverA value=29.0 0000000004 +dbname +rpname +cpu,type=usage,host=serverB value=93.1 0000000004 +dbname +rpname +cpu,type=usage,host=serverA value=45.0 0000000005 +dbname +rpname +cpu,type=usage,host=serverB value=92.6 0000000005 +dbname +rpname +cpu,type=usage,host=serverA value=61.0 0000000006 +dbname +rpname +cpu,type=usage,host=serverB value=95.8 0000000006 +dbname +rpname +cpu,type=usage,host=serverC value=95.8 0000000006 +dbname +rpname +cpu,type=usage,host=serverA value=49.0 0000000007 +dbname +rpname +cpu,type=usage,host=serverB value=91.3 0000000007 +dbname +rpname +cpu,type=usage,host=serverA value=29.0 0000000008 +dbname +rpname +cpu,type=usage,host=serverB value=72.3 0000000008 +dbname +rpname +cpu,type=usage,host=serverA value=45.0 0000000009 +dbname +rpname +cpu,type=usage,host=serverB value=97.1 0000000009 +dbname +rpname +cpu,type=usage,host=serverA value=61.0 0000000010 +dbname +rpname +cpu,type=usage,host=serverB value=73.1 0000000010 +dbname +rpname +cpu,type=usage,host=serverA value=81.0 0000000011 +dbname +rpname +cpu,type=usage,host=serverB value=28.2 0000000011 +dbname +rpname +cpu,type=usage,host=serverA value=69.0 0000000012 +dbname +rpname +cpu,type=usage,host=serverB value=77.0 0000000012 +dbname +rpname +cpu,type=usage,host=serverA value=50.0 0000000013 +dbname +rpname +cpu,type=usage,host=serverB value=43.4 0000000013 +dbname +rpname +cpu,type=usage,host=serverA value=41.0 0000000014 +dbname +rpname +cpu,type=usage,host=serverB value=32.1 0000000014 +dbname +rpname +cpu,type=usage,host=serverA value=25.0 0000000015 +dbname +rpname +cpu,type=usage,host=serverB value=92.0 0000000015 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 63ba7372a..1530a6034 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -4349,6 +4349,748 @@ stream } } +func TestStream_Alert_WithReset_0(t *testing.T) { + requestCount := int32(0) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ad := kapacitor.AlertData{} + dec := json.NewDecoder(r.Body) + err := dec.Decode(&ad) + if err != nil { + t.Fatal(err) + } + atomic.AddInt32(&requestCount, 1) + rc := atomic.LoadInt32(&requestCount) + var expAd kapacitor.AlertData + switch rc { + case 1: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 45.0, + }}, + }, + }, + }, + } + case 2: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), + Duration: time.Second, + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), + 40.0, + }}, + }, + }, + }, + } + case 3: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + Duration: 2 * time.Second, + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 30.0, + }}, + }, + }, + }, + } + case 4: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is OK", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + Duration: 3 * time.Second, + Level: kapacitor.OKAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + 9.0, + }}, + }, + }, + }, + } + case 5: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + Duration: 0 * time.Second, + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 45.0, + }}, + }, + }, + }, + } + case 6: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is WARNING", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + Duration: 1 * time.Second, + Level: kapacitor.WarnAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 61.0, + }}, + }, + }, + }, + } + case 7: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is WARNING", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + Duration: 2 * time.Second, + Level: kapacitor.WarnAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 30.0, + }}, + }, + }, + }, + } + case 8: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is OK", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC), + Duration: 3 * time.Second, + Level: kapacitor.OKAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC), + 19.0, + }}, + }, + }, + }, + } + case 9: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + Duration: 0 * time.Second, + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 45.0, + }}, + }, + }, + }, + } + case 10: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is WARNING", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC), + Duration: 1 * time.Second, + Level: kapacitor.WarnAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC), + 61.0, + }}, + }, + }, + }, + } + case 11: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is CRITICAL", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + Duration: 2 * time.Second, + Level: kapacitor.CritAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 81.0, + }}, + }, + }, + }, + } + case 12: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is OK", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC), + Duration: 3 * time.Second, + Level: kapacitor.OKAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC), + 29.0, + }}, + }, + }, + }, + } + } + + if eq, msg := compareAlertData(expAd, ad); !eq { + t.Errorf("unexpected alert data for request: %d %s", rc, msg) + } + })) + defer ts.Close() + + var script = ` +var infoThreshold = 40.0 +var warnThreshold = 60.0 +var critThreshold = 80.0 + +var infoResetThreshold = 10.0 +var warnResetThreshold = 20.0 +var critResetThreshold = 30.0 + +stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + |alert() + .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') + .details('details') + .idField('id') + .idTag('id') + .levelField('level') + .levelTag('level') + .info(lambda: "value" > infoThreshold) + .infoReset(lambda: "value" < infoResetThreshold) + .warn(lambda: "value" > warnThreshold) + .warnReset(lambda: "value" < warnResetThreshold) + .crit(lambda: "value" > critThreshold) + .critReset(lambda: "value" < critResetThreshold) + .post('` + ts.URL + `') + |httpOut('TestStream_Alert_WithReset_0') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "level": "OK", "id": "kapacitor/cpu/serverA", "type": "usage"}, + Columns: []string{"time", "id", "level", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC), + "kapacitor/cpu/serverA", + "OK", + 29.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Alert_WithReset_0", script, 13*time.Second, er, nil, false) + + if rc := atomic.LoadInt32(&requestCount); rc != 12 { + t.Errorf("got %v exp %v", rc, 1) + } +} + +func TestStream_Alert_WithReset_1(t *testing.T) { + requestCount := int32(0) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ad := kapacitor.AlertData{} + dec := json.NewDecoder(r.Body) + err := dec.Decode(&ad) + if err != nil { + t.Fatal(err) + } + atomic.AddInt32(&requestCount, 1) + rc := atomic.LoadInt32(&requestCount) + var expAd kapacitor.AlertData + switch rc { + case 1: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 45.0, + }}, + }, + }, + }, + } + case 2: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), + Duration: time.Second, + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), + 40.0, + }}, + }, + }, + }, + } + case 3: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + Duration: 2 * time.Second, + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 30.0, + }}, + }, + }, + }, + } + case 4: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is OK", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + Duration: 3 * time.Second, + Level: kapacitor.OKAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + 29.0, + }}, + }, + }, + }, + } + case 5: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + Duration: 0 * time.Second, + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 45.0, + }}, + }, + }, + }, + } + case 6: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is WARNING", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + Duration: 1 * time.Second, + Level: kapacitor.WarnAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 61.0, + }}, + }, + }, + }, + } + case 7: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + Duration: 2 * time.Second, + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 49.0, + }}, + }, + }, + }, + } + case 8: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is OK", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC), + Duration: 3 * time.Second, + Level: kapacitor.OKAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC), + 29.0, + }}, + }, + }, + }, + } + case 9: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + Duration: 0 * time.Second, + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 45.0, + }}, + }, + }, + }, + } + case 10: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is WARNING", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC), + Duration: 1 * time.Second, + Level: kapacitor.WarnAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC), + 61.0, + }}, + }, + }, + }, + } + case 11: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is CRITICAL", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + Duration: 2 * time.Second, + Level: kapacitor.CritAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 81.0, + }}, + }, + }, + }, + } + case 12: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is WARNING", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC), + Duration: 3 * time.Second, + Level: kapacitor.WarnAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC), + 69.0, + }}, + }, + }, + }, + } + case 13: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is WARNING", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC), + Duration: 4 * time.Second, + Level: kapacitor.WarnAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC), + 50.0, + }}, + }, + }, + }, + } + case 14: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is INFO", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 13, 0, time.UTC), + Duration: 5 * time.Second, + Level: kapacitor.InfoAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 13, 0, time.UTC), + 41.0, + }}, + }, + }, + }, + } + case 15: + expAd = kapacitor.AlertData{ + ID: "kapacitor/cpu/serverA", + Message: "kapacitor/cpu/serverA is OK", + Details: "details", + Time: time.Date(1971, 1, 1, 0, 0, 14, 0, time.UTC), + Duration: 6 * time.Second, + Level: kapacitor.OKAlert, + Data: influxql.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "usage"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 14, 0, time.UTC), + 25.0, + }}, + }, + }, + }, + } + } + + if eq, msg := compareAlertData(expAd, ad); !eq { + t.Errorf("unexpected alert data for request: %d %s", rc, msg) + } + })) + defer ts.Close() + + var script = ` +var infoThreshold = 40.0 +var warnThreshold = 60.0 +var critThreshold = 80.0 + +var infoResetThreshold = 30.0 +var warnResetThreshold = 50.0 +var critResetThreshold = 70.0 + +stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + |alert() + .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') + .details('details') + .idField('id') + .idTag('id') + .levelField('level') + .levelTag('level') + .info(lambda: "value" > infoThreshold) + .infoReset(lambda: "value" < infoResetThreshold) + .warn(lambda: "value" > warnThreshold) + .warnReset(lambda: "value" < warnResetThreshold) + .crit(lambda: "value" > critThreshold) + .critReset(lambda: "value" < critResetThreshold) + .post('` + ts.URL + `') + |httpOut('TestStream_Alert_WithReset_1') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "level": "OK", "id": "kapacitor/cpu/serverA", "type": "usage"}, + Columns: []string{"time", "id", "level", "value"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 14, 0, time.UTC), + "kapacitor/cpu/serverA", + "OK", + 25.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Alert_WithReset_1", script, 15*time.Second, er, nil, false) + + if rc := atomic.LoadInt32(&requestCount); rc != 15 { + t.Errorf("got %v exp %v", rc, 1) + } +} + func TestStream_AlertDuration(t *testing.T) { requestCount := int32(0) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/pipeline/alert.go b/pipeline/alert.go index b4dbefd06..532a3b22c 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -87,6 +87,23 @@ const defaultLogFileMode = 0600 // For each point an expression may or may not be evaluated. // If no expression is true then the alert is considered to be in the OK state. // +// Kapacitor supports alert reset expressions. +// This way when an alert enters a state, it can only be evaluated for less severity state if its reset expression evaluated successfully. +// +// Example: +// stream +// |from() +// .measurement('cpu') +// .where(lambda: "host" == 'serverA') +// .groupBy('host') +// |alert() +// .info(lambda: "value" > 60) +// .infoReset(lambda: "value" < 10) +// .warn(lambda: "value" > 70) +// .warnReset(lambda: "value" < 20) +// .crit(lambda: "value" > 80) +// .critReset(lambda: "value" < 30) +// // Available Statistics: // // * alerts_triggered -- Total number of alerts triggered @@ -206,6 +223,13 @@ type AlertNode struct { // An empty value indicates the level is invalid and is skipped. Crit *ast.LambdaNode + // Filter expression for reseting the INFO alert level to lower level. + InfoReset *ast.LambdaNode + // Filter expression for reseting the WARNING alert level to lower level. + WarnReset *ast.LambdaNode + // Filter expression for reseting the CRITICAL alert level to lower level. + CritReset *ast.LambdaNode + //tick:ignore UseFlapping bool `tick:"Flapping"` //tick:ignore