Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resetting alert levels #740

Merged
merged 15 commits into from
Aug 8, 2016
28 changes: 26 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,32 @@

### Release Notes

#### Alert reset expressions

Kapacitor now supports alert reset expressions.
This helps keep an alert state from being lowered in severity until its reset expression evaluated successfully.

Example:

```go
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|alert()
.info(lambda: "value" > 60)
.infoReset(lambda: "value" < 10)
.warn(lambda: "value" > 70)
.warnReset(lambda: "value" < 20)
.crit(lambda: "value" > 80)
.critReset(lambda: "value" < 30)
```

### Features

- [#740](https://github.com/influxdata/kapacitor/pull/740): Support reset expressions to prevent an alert from being lowered in severity

### Bugfixes

- [#783](https://github.com/influxdata/kapacitor/pull/783): Fix panic when revoking tokens not already defined.
Expand Down Expand Up @@ -96,7 +120,7 @@ stream
- [#662](https://github.com/influxdata/kapacitor/pull/662): Add `-skipVerify` flag to `kapacitor` CLI tool to skip SSL verification.
- [#680](https://github.com/influxdata/kapacitor/pull/680): Add Telegram Alerting option
- [#46](https://github.com/influxdata/kapacitor/issues/46): Can now create combinations of points within the same stream.
This is kind of like join but instead joining a stream with itself.
This is kind of like join but instead joining a stream with itself.
- [#669](https://github.com/influxdata/kapacitor/pull/669): Add size function for humanize byte size. thanks @jsvisa!
- [#697](https://github.com/influxdata/kapacitor/pull/697): Can now flatten a set of points into a single points creating dynamcially named fields.
- [#698](https://github.com/influxdata/kapacitor/pull/698): Join delimiter can be specified.
Expand All @@ -110,7 +134,7 @@ stream
- [#627](https://github.com/influxdata/kapacitor/issues/627): Fix where InfluxQL functions that returned a batch could drop tags.
- [#674](https://github.com/influxdata/kapacitor/issues/674): Fix panic with Join On and batches.
- [#665](https://github.com/influxdata/kapacitor/issues/665): BREAKING: Fix file mode not being correct for Alert.Log files.
Breaking change is that integers numbers prefixed with a 0 in TICKscript are interpreted as octal numbers.
Breaking change is that integers numbers prefixed with a 0 in TICKscript are interpreted as octal numbers.
- [#667](https://github.com/influxdata/kapacitor/issues/667): Align deadman timestamps to interval.

## v1.0.0-beta2 [2016-06-17]
Expand Down
75 changes: 69 additions & 6 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type AlertNode struct {
critsTriggered *expvar.Int

bufPool sync.Pool

levelResets []stateful.Expression
lrScopePools []stateful.ScopePool
}

// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
Expand Down Expand Up @@ -310,6 +313,9 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
an.levels = make([]stateful.Expression, CritAlert+1)
an.scopePools = make([]stateful.ScopePool, CritAlert+1)

an.levelResets = make([]stateful.Expression, CritAlert+1)
an.lrScopePools = make([]stateful.ScopePool, CritAlert+1)

if n.Info != nil {
statefulExpression, expressionCompileError := stateful.NewExpression(n.Info.Expression)
if expressionCompileError != nil {
Expand All @@ -318,6 +324,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *

an.levels[InfoAlert] = statefulExpression
an.scopePools[InfoAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Info.Expression))
if n.InfoReset != nil {
lstatefulExpression, lexpressionCompileError := stateful.NewExpression(n.InfoReset.Expression)
if lexpressionCompileError != nil {
return nil, fmt.Errorf("Failed to compile stateful expression for infoReset: %s", lexpressionCompileError)
}
an.levelResets[InfoAlert] = lstatefulExpression
an.lrScopePools[InfoAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.InfoReset.Expression))
}
}

if n.Warn != nil {
Expand All @@ -327,6 +341,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}
an.levels[WarnAlert] = statefulExpression
an.scopePools[WarnAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Warn.Expression))
if n.WarnReset != nil {
lstatefulExpression, lexpressionCompileError := stateful.NewExpression(n.WarnReset.Expression)
if lexpressionCompileError != nil {
return nil, fmt.Errorf("Failed to compile stateful expression for warnReset: %s", lexpressionCompileError)
}
an.levelResets[WarnAlert] = lstatefulExpression
an.lrScopePools[WarnAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.WarnReset.Expression))
}
}

if n.Crit != nil {
Expand All @@ -336,6 +358,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}
an.levels[CritAlert] = statefulExpression
an.scopePools[CritAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.Crit.Expression))
Copy link
Contributor

@nathanielc nathanielc Jul 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will need separate scopePools for the reset expressions.

A scope pool simply reuses the existing scope objects for each evaluation of the expression. This helps the evaluation to remain allocation free. There is no expectation that a reset expression will use exactly the same fields and tags as the normal expression, so it will needs its own scope pool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, at first I thought the reset expression is kind of "opposite" to the normal expression, so it may use the exact fields and tags from the scope pool of the normal expression. I'll have this updated.

if n.CritReset != nil {
lstatefulExpression, lexpressionCompileError := stateful.NewExpression(n.CritReset.Expression)
if lexpressionCompileError != nil {
return nil, fmt.Errorf("Failed to compile stateful expression for critReset: %s", lexpressionCompileError)
}
an.levelResets[CritAlert] = lstatefulExpression
an.lrScopePools[CritAlert] = stateful.NewScopePool(stateful.FindReferenceVariables(n.CritReset.Expression))
}
}

// Setup states
Expand Down Expand Up @@ -374,7 +404,11 @@ func (a *AlertNode) runAlert([]byte) error {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
a.timer.Start()
l := a.determineLevel(p.Time, p.Fields, p.Tags)
var currentLevel AlertLevel
if state, ok := a.states[p.Group]; ok {
currentLevel = state.currentLevel()
}
l := a.determineLevel(p.Time, p.Fields, p.Tags, currentLevel)
state := a.updateState(p.Time, l, p.Group)
if (a.a.UseFlapping && state.flapping) || (a.a.IsStateChangesOnly && !state.changed && !state.expired) {
a.timer.Stop()
Expand Down Expand Up @@ -442,7 +476,11 @@ func (a *AlertNode) runAlert([]byte) error {
var highestPoint *models.BatchPoint

for i, p := range b.Points {
l := a.determineLevel(p.Time, p.Fields, p.Tags)
var currentLevel AlertLevel
if state, ok := a.states[b.Group]; ok {
currentLevel = state.currentLevel()
}
l := a.determineLevel(p.Time, p.Fields, p.Tags, currentLevel)
if l < lowestLevel {
lowestLevel = l
}
Expand Down Expand Up @@ -552,8 +590,28 @@ func (a *AlertNode) handleAlert(ad *AlertData) {
}
}

func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map[string]string) AlertLevel {
for l := len(a.levels) - 1; l >= 0; l-- {
func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map[string]string, currentLevel AlertLevel) AlertLevel {
if higherLevel, found := a.findFirstMatchLevel(CritAlert, currentLevel-1, now, fields, tags); found {
return higherLevel
}
if rse := a.levelResets[currentLevel]; rse != nil {
if pass, err := EvalPredicate(rse, a.lrScopePools[currentLevel], now, fields, tags); err != nil {
a.logger.Printf("E! error evaluating reset expression for current level %v: %s", currentLevel, err)
} else if !pass {
return currentLevel
}
}
if newLevel, found := a.findFirstMatchLevel(currentLevel, OKAlert, now, fields, tags); found {
return newLevel
}
return OKAlert
}

func (a *AlertNode) findFirstMatchLevel(start AlertLevel, stop AlertLevel, now time.Time, fields models.Fields, tags map[string]string) (AlertLevel, bool) {
if stop < OKAlert {
stop = OKAlert
}
for l := start; l > stop; l-- {
se := a.levels[l]
if se == nil {
continue
Expand All @@ -562,10 +620,10 @@ func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map
a.logger.Printf("E! error evaluating expression for level %v: %s", AlertLevel(l), err)
continue
} else if pass {
return AlertLevel(l)
return AlertLevel(l), true
}
}
return OKAlert
return OKAlert, false
}

func (a *AlertNode) batchToResult(b models.Batch) influxql.Result {
Expand Down Expand Up @@ -646,6 +704,11 @@ func (a *alertState) addEvent(level AlertLevel) {
a.history[a.idx] = level
}

// Return current level of this state
func (a *alertState) currentLevel() AlertLevel {
return a.history[a.idx]
}

// Compute the percentage change in the alert history.
func (a *alertState) percentChange() float64 {
l := len(a.history)
Expand Down
78 changes: 78 additions & 0 deletions integrations/data/TestStream_Alert_WithReset_0.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
dbname
rpname
cpu,type=usage,host=serverA value=45.0 0000000001
dbname
rpname
cpu,type=usage,host=serverB value=97.1 0000000001
dbname
rpname
disk,type=sda,host=serverB value=39.0 0000000001
dbname
rpname
cpu,type=usage,host=serverA value=40.0 0000000002
dbname
rpname
cpu,type=usage,host=serverB value=92.6 0000000002
dbname
rpname
cpu,type=usage,host=serverA value=30.0 0000000003
dbname
rpname
cpu,type=usage,host=serverB value=95.6 0000000003
dbname
rpname
cpu,type=usage,host=serverA value=9.0 0000000004
dbname
rpname
cpu,type=usage,host=serverB value=93.1 0000000004
dbname
rpname
cpu,type=usage,host=serverA value=45.0 0000000005
dbname
rpname
cpu,type=usage,host=serverB value=92.6 0000000005
dbname
rpname
cpu,type=usage,host=serverA value=61.0 0000000006
dbname
rpname
cpu,type=usage,host=serverB value=95.8 0000000006
dbname
rpname
cpu,type=usage,host=serverC value=95.8 0000000006
dbname
rpname
cpu,type=usage,host=serverA value=30.0 0000000007
dbname
rpname
cpu,type=usage,host=serverB value=91.3 0000000007
dbname
rpname
cpu,type=usage,host=serverA value=19.0 0000000008
dbname
rpname
cpu,type=usage,host=serverB value=72.3 0000000008
dbname
rpname
cpu,type=usage,host=serverA value=45.0 0000000009
dbname
rpname
cpu,type=usage,host=serverB value=97.1 0000000009
dbname
rpname
cpu,type=usage,host=serverA value=61.0 0000000010
dbname
rpname
cpu,type=usage,host=serverB value=97.1 0000000010
dbname
rpname
cpu,type=usage,host=serverA value=81.0 0000000011
dbname
rpname
cpu,type=usage,host=serverB value=97.1 0000000011
dbname
rpname
cpu,type=usage,host=serverA value=29.0 0000000012
dbname
rpname
cpu,type=usage,host=serverB value=97.1 0000000012
96 changes: 96 additions & 0 deletions integrations/data/TestStream_Alert_WithReset_1.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
dbname
rpname
cpu,type=usage,host=serverA value=45.0 0000000001
dbname
rpname
cpu,type=usage,host=serverB value=97.1 0000000001
dbname
rpname
disk,type=sda,host=serverB value=39.0 0000000001
dbname
rpname
cpu,type=usage,host=serverA value=40.0 0000000002
dbname
rpname
cpu,type=usage,host=serverB value=92.6 0000000002
dbname
rpname
cpu,type=usage,host=serverA value=30.0 0000000003
dbname
rpname
cpu,type=usage,host=serverB value=95.6 0000000003
dbname
rpname
cpu,type=usage,host=serverA value=29.0 0000000004
dbname
rpname
cpu,type=usage,host=serverB value=93.1 0000000004
dbname
rpname
cpu,type=usage,host=serverA value=45.0 0000000005
dbname
rpname
cpu,type=usage,host=serverB value=92.6 0000000005
dbname
rpname
cpu,type=usage,host=serverA value=61.0 0000000006
dbname
rpname
cpu,type=usage,host=serverB value=95.8 0000000006
dbname
rpname
cpu,type=usage,host=serverC value=95.8 0000000006
dbname
rpname
cpu,type=usage,host=serverA value=49.0 0000000007
dbname
rpname
cpu,type=usage,host=serverB value=91.3 0000000007
dbname
rpname
cpu,type=usage,host=serverA value=29.0 0000000008
dbname
rpname
cpu,type=usage,host=serverB value=72.3 0000000008
dbname
rpname
cpu,type=usage,host=serverA value=45.0 0000000009
dbname
rpname
cpu,type=usage,host=serverB value=97.1 0000000009
dbname
rpname
cpu,type=usage,host=serverA value=61.0 0000000010
dbname
rpname
cpu,type=usage,host=serverB value=73.1 0000000010
dbname
rpname
cpu,type=usage,host=serverA value=81.0 0000000011
dbname
rpname
cpu,type=usage,host=serverB value=28.2 0000000011
dbname
rpname
cpu,type=usage,host=serverA value=69.0 0000000012
dbname
rpname
cpu,type=usage,host=serverB value=77.0 0000000012
dbname
rpname
cpu,type=usage,host=serverA value=50.0 0000000013
dbname
rpname
cpu,type=usage,host=serverB value=43.4 0000000013
dbname
rpname
cpu,type=usage,host=serverA value=41.0 0000000014
dbname
rpname
cpu,type=usage,host=serverB value=32.1 0000000014
dbname
rpname
cpu,type=usage,host=serverA value=25.0 0000000015
dbname
rpname
cpu,type=usage,host=serverB value=92.0 0000000015
Loading