Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "WIP: Add cardinality stat to each node type." #1240

Merged
merged 1 commit into from
Mar 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
Renamed `eval_errors` to `errors` in eval node.
- [#922](https://github.com/influxdata/kapacitor/issues/922): Expose server specific information in alert templates.
- [#1162](https://github.com/influxdata/kapacitor/pulls/1162): Add Pushover integration.
- [#1221](https://github.com/influxdata/kapacitor/pull/1221): Add `working_cardinality` stat to each node type that tracks the number of groups per node.

### Bugfixes

Expand Down
25 changes: 3 additions & 22 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ type AlertNode struct {
messageTmpl *text.Template
detailsTmpl *html.Template

statesMu sync.RWMutex

alertsTriggered *expvar.Int
oksTriggered *expvar.Int
infosTriggered *expvar.Int
Expand Down Expand Up @@ -450,14 +448,6 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

func (a *AlertNode) runAlert([]byte) error {
valueF := func() int64 {
a.statesMu.RLock()
l := len(a.states)
a.statesMu.RUnlock()
return int64(l)
}
a.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

// Register delete hook
if a.hasAnonTopic() {
a.et.tm.registerDeleteHookForTask(a.et.Task.ID, deleteAlertHook(a.anonTopic))
Expand Down Expand Up @@ -498,7 +488,7 @@ func (a *AlertNode) runAlert([]byte) error {
return err
}
var currentLevel alert.Level
if state, ok := a.getAlertState(p.Group); ok {
if state, ok := a.states[p.Group]; ok {
currentLevel = state.currentLevel()
} else {
// Check for previous state
Expand Down Expand Up @@ -590,7 +580,7 @@ func (a *AlertNode) runAlert([]byte) error {
var highestPoint *models.BatchPoint

var currentLevel alert.Level
if state, ok := a.getAlertState(b.Group); ok {
if state, ok := a.states[b.Group]; ok {
currentLevel = state.currentLevel()
} else {
// Check for previous state
Expand Down Expand Up @@ -944,14 +934,12 @@ func (a *alertState) percentChange() float64 {
}

func (a *AlertNode) updateState(t time.Time, level alert.Level, group models.GroupID) *alertState {
state, ok := a.getAlertState(group)
state, ok := a.states[group]
if !ok {
state = &alertState{
history: make([]alert.Level, a.a.History),
}
a.statesMu.Lock()
a.states[group] = state
a.statesMu.Unlock()
}
state.addEvent(level)

Expand Down Expand Up @@ -1086,10 +1074,3 @@ func (a *AlertNode) renderMessageAndDetails(id, name string, t time.Time, group
details := tmpBuffer.String()
return msg, details, nil
}

func (a *AlertNode) getAlertState(id models.GroupID) (state *alertState, ok bool) {
a.statesMu.RLock()
state, ok = a.states[id]
a.statesMu.RUnlock()
return state, ok
}
17 changes: 0 additions & 17 deletions combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"fmt"
"log"
"sort"
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/stateful"
Expand All @@ -21,8 +19,6 @@ type CombineNode struct {
expressionsByGroup map[models.GroupID][]stateful.Expression
scopePools []stateful.ScopePool

expressionsByGroupMu sync.RWMutex

combination combination
}

Expand All @@ -34,7 +30,6 @@ func newCombineNode(et *ExecutingTask, n *pipeline.CombineNode, l *log.Logger) (
expressionsByGroup: make(map[models.GroupID][]stateful.Expression),
combination: combination{max: n.Max},
}

// Create stateful expressions
cn.expressions = make([]stateful.Expression, len(n.Lambdas))
cn.scopePools = make([]stateful.ScopePool, len(n.Lambdas))
Expand Down Expand Up @@ -65,14 +60,6 @@ func (t timeList) Less(i, j int) bool { return t[i].Before(t[j]) }
func (t timeList) Swap(i, j int) { t[i], t[j] = t[j], t[i] }

func (n *CombineNode) runCombine([]byte) error {
valueF := func() int64 {
n.expressionsByGroupMu.RLock()
l := len(n.expressionsByGroup)
n.expressionsByGroupMu.RUnlock()
return int64(l)
}
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch n.Wants() {
case pipeline.StreamEdge:
buffers := make(map[models.GroupID]*buffer)
Expand Down Expand Up @@ -175,17 +162,13 @@ func (n *CombineNode) combineBuffer(buf *buffer) error {
return nil
}
l := len(n.expressions)
n.expressionsByGroupMu.RLock()
expressions, ok := n.expressionsByGroup[buf.Group]
n.expressionsByGroupMu.RUnlock()
if !ok {
expressions = make([]stateful.Expression, l)
for i, expr := range n.expressions {
expressions[i] = expr.CopyReset()
}
n.expressionsByGroupMu.Lock()
n.expressionsByGroup[buf.Group] = expressions
n.expressionsByGroupMu.Unlock()
}

// Compute matching result for all points
Expand Down
17 changes: 0 additions & 17 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package kapacitor

import (
"log"
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand All @@ -29,25 +27,12 @@ func newDerivativeNode(et *ExecutingTask, n *pipeline.DerivativeNode, l *log.Log
func (d *DerivativeNode) runDerivative([]byte) error {
switch d.Provides() {
case pipeline.StreamEdge:
var mu sync.RWMutex
previous := make(map[models.GroupID]models.Point)
valueF := func() int64 {
mu.RLock()
l := len(previous)
mu.RUnlock()
return int64(l)
}
d.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() {
d.timer.Start()
mu.RLock()
pr, ok := previous[p.Group]
mu.RUnlock()
if !ok {
mu.Lock()
previous[p.Group] = p
mu.Unlock()
d.timer.Stop()
continue
}
Expand All @@ -66,9 +51,7 @@ func (d *DerivativeNode) runDerivative([]byte) error {
}
d.timer.Resume()
}
mu.Lock()
previous[p.Group] = p
mu.Unlock()
d.timer.Stop()
}
case pipeline.BatchEdge:
Expand Down
19 changes: 0 additions & 19 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"errors"
"fmt"
"log"
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
Expand All @@ -22,10 +20,6 @@ type EvalNode struct {
refVarList [][]string
scopePool stateful.ScopePool
tags map[string]bool

expressionsByGroupMu sync.RWMutex

evalErrors *expvar.Int
}

// Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point.
Expand All @@ -38,7 +32,6 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
e: n,
expressionsByGroup: make(map[models.GroupID][]stateful.Expression),
}

// Create stateful expressions
en.expressions = make([]stateful.Expression, len(n.Lambdas))
en.refVarList = make([][]string, len(n.Lambdas))
Expand Down Expand Up @@ -69,14 +62,6 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
}

func (e *EvalNode) runEval(snapshot []byte) error {
valueF := func() int64 {
e.expressionsByGroupMu.RLock()
l := len(e.expressionsByGroup)
e.expressionsByGroupMu.RUnlock()
return int64(l)
}
e.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch e.Provides() {
case pipeline.StreamEdge:
var err error
Expand Down Expand Up @@ -133,17 +118,13 @@ func (e *EvalNode) runEval(snapshot []byte) error {
func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Fields, tags models.Tags) (models.Fields, models.Tags, error) {
vars := e.scopePool.Get()
defer e.scopePool.Put(vars)
e.expressionsByGroupMu.RLock()
expressions, ok := e.expressionsByGroup[group]
e.expressionsByGroupMu.RUnlock()
if !ok {
expressions = make([]stateful.Expression, len(e.expressions))
for i, exp := range e.expressions {
expressions[i] = exp.CopyReset()
}
e.expressionsByGroupMu.Lock()
e.expressionsByGroup[group] = expressions
e.expressionsByGroupMu.Unlock()
}
for i, expr := range expressions {
err := fillScope(vars, e.refVarList[i], now, fields, tags)
Expand Down
23 changes: 0 additions & 23 deletions expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,6 @@ func (v *Int) IntValue() int64 {
return atomic.LoadInt64(&v.i)
}

// IntFuncGauge is a 64-bit integer variable that satisfies the expvar.Var interface.
type IntFuncGauge struct {
ValueF func() int64
}

func (v *IntFuncGauge) String() string {
return strconv.FormatInt(v.IntValue(), 10)
}

func (v *IntFuncGauge) Add(delta int64) {}
func (v *IntFuncGauge) Set(value int64) {}

func (v *IntFuncGauge) IntValue() int64 {
if v == nil || v.ValueF == nil {
return 0
}
return v.ValueF()
}

func NewIntFuncGauge(fn func() int64) *IntFuncGauge {
return &IntFuncGauge{fn}
}

// IntSum is a 64-bit integer variable that consists of multiple different parts
// and satisfies the expvar.Var interface.
// The value of the var is the sum of all its parts.
Expand Down
26 changes: 0 additions & 26 deletions flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand Down Expand Up @@ -50,24 +49,13 @@ type flattenBatchBuffer struct {
}

func (n *FlattenNode) runFlatten([]byte) error {
var mu sync.RWMutex
switch n.Wants() {
case pipeline.StreamEdge:
flattenBuffers := make(map[models.GroupID]*flattenStreamBuffer)
valueF := func() int64 {
mu.RLock()
l := len(flattenBuffers)
mu.RUnlock()
return int64(l)
}
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() {
n.timer.Start()
t := p.Time.Round(n.f.Tolerance)
mu.RLock()
currentBuf, ok := flattenBuffers[p.Group]
mu.RUnlock()
if !ok {
currentBuf = &flattenStreamBuffer{
Time: t,
Expand All @@ -76,9 +64,7 @@ func (n *FlattenNode) runFlatten([]byte) error {
Dimensions: p.Dimensions,
Tags: p.PointTags(),
}
mu.Lock()
flattenBuffers[p.Group] = currentBuf
mu.Unlock()
}
rp := models.RawPoint{
Time: t,
Expand Down Expand Up @@ -118,20 +104,10 @@ func (n *FlattenNode) runFlatten([]byte) error {
}
case pipeline.BatchEdge:
allBuffers := make(map[models.GroupID]*flattenBatchBuffer)
valueF := func() int64 {
mu.RLock()
l := len(allBuffers)
mu.RUnlock()
return int64(l)
}
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
n.timer.Start()
t := b.TMax.Round(n.f.Tolerance)
mu.RLock()
currentBuf, ok := allBuffers[b.Group]
mu.RUnlock()
if !ok {
currentBuf = &flattenBatchBuffer{
Time: t,
Expand All @@ -140,9 +116,7 @@ func (n *FlattenNode) runFlatten([]byte) error {
Tags: b.Tags,
Points: make(map[time.Time][]models.RawPoint),
}
mu.Lock()
allBuffers[b.Group] = currentBuf
mu.Unlock()
}
if !t.Equal(currentBuf.Time) {
// Flatten/Emit old buffer
Expand Down
Loading