Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several fixes to make batch queries work better as CQ operations #412

Merged
merged 1 commit into from
Apr 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ format a TICKscript file according to a common standard.
- [#389](https://github.com/influxdata/kapacitor/pull/389): Adds benchmarks to Kapacitor for basic use cases.
- [#390](https://github.com/influxdata/kapacitor/issues/390): BREAKING: Remove old `.mapReduce` functions.
- [#381](https://github.com/influxdata/kapacitor/pull/381): Adding enable/disable/delete/reload tasks by glob.

- [#401](https://github.com/influxdata/kapacitor/issues/401): Add `.align()` property to BatchNode so you can align query start and stop times.

### Bugfixes

- [#378](https://github.com/influxdata/kapacitor/issues/378): Fix issue where derivative would divide by zero.
- [#387](https://github.com/influxdata/kapacitor/issues/387): Add `.quiet()` option to EvalNode so errors can be suppressed if expected.
- [#400](https://github.com/influxdata/kapacitor/issues/400): All query/connection errors are counted and reported in BatchNode stats.
- [#412](https://github.com/influxdata/kapacitor/pull/412): Fix issues with batch queries dropping points because of nil fields.


## v0.11.0 [2016-03-22]
Expand Down
88 changes: 64 additions & 24 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func (s *SourceBatchNode) collectedCount() (count int64) {
return
}

const (
statsQueryErrors = "query_errors"
statsConnectErrors = "connect_errors"
)

type BatchNode struct {
node
b *pipeline.BatchNode
Expand Down Expand Up @@ -160,23 +165,7 @@ func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode, l *log.Logger) (*Bat
}
switch {
case n.Every != 0:
bn.ticker = newTimeTicker(n.Every)
case n.Cron != "":
var err error
bn.ticker, err = newCronTicker(n.Cron)
if err != nil {
return nil, err
}
default:
return nil, errors.New("must define one of 'every' or 'cron'")
}

if n.Every != 0 && n.Cron != "" {
return nil, errors.New("must not set both 'every' and 'cron' properties")
}
switch {
case n.Every != 0:
bn.ticker = newTimeTicker(n.Every)
bn.ticker = newTimeTicker(n.Every, n.AlignFlag)
case n.Cron != "":
var err error
bn.ticker, err = newCronTicker(n.Cron)
Expand Down Expand Up @@ -270,6 +259,7 @@ func (b *BatchNode) doQuery() error {
if err != nil {
b.logger.Println("E! failed to connect to InfluxDB:", err)
b.timer.Stop()
b.statMap.Add(statsConnectErrors, 1)
break
}
q := client.Query{
Expand All @@ -281,12 +271,14 @@ func (b *BatchNode) doQuery() error {
if err != nil {
b.logger.Println("E! query failed:", err)
b.timer.Stop()
b.statMap.Add(statsQueryErrors, 1)
break
}

if err := resp.Error(); err != nil {
b.logger.Println("E! query failed:", err)
b.timer.Stop()
b.statMap.Add(statsQueryErrors, 1)
break
}

Expand All @@ -295,6 +287,7 @@ func (b *BatchNode) doQuery() error {
batches, err := models.ResultToBatches(res)
if err != nil {
b.logger.Println("E! failed to understand query result:", err)
b.statMap.Add(statsQueryErrors, 1)
continue
}
b.timer.Pause()
Expand All @@ -312,6 +305,8 @@ func (b *BatchNode) doQuery() error {
}

func (b *BatchNode) runBatch([]byte) error {
b.statMap.Add(statsQueryErrors, 0)
b.statMap.Add(statsConnectErrors, 0)
errC := make(chan error, 1)
go func() {
defer func() {
Expand Down Expand Up @@ -371,20 +366,61 @@ type ticker interface {
}

type timeTicker struct {
every time.Duration
ticker *time.Ticker
mu sync.Mutex
every time.Duration
alignChan chan time.Time
stopping chan struct{}
ticker *time.Ticker
mu sync.Mutex
wg sync.WaitGroup
}

func newTimeTicker(every time.Duration) *timeTicker {
return &timeTicker{every: every}
func newTimeTicker(every time.Duration, align bool) *timeTicker {
t := &timeTicker{
every: every,
}
if align {
t.alignChan = make(chan time.Time)
t.stopping = make(chan struct{})
}
return t
}

func (t *timeTicker) Start() <-chan time.Time {
t.mu.Lock()
defer t.mu.Unlock()
t.ticker = time.NewTicker(t.every)
return t.ticker.C
if t.alignChan != nil {
t.wg.Add(1)
go func() {
defer t.wg.Done()
// Sleep until we are roughly aligned
now := time.Now()
next := now.Truncate(t.every).Add(t.every)
after := time.NewTicker(next.Sub(now))
select {
case <-after.C:
after.Stop()
case <-t.stopping:
after.Stop()
return
}
t.ticker = time.NewTicker(t.every)
// Send first event since we waited for it explicitly
t.alignChan <- next
for {
select {
case <-t.stopping:
return
case now := <-t.ticker.C:
now = now.Truncate(t.every)
t.alignChan <- now
}
}
}()
return t.alignChan
} else {
t.ticker = time.NewTicker(t.every)
return t.ticker.C
}
}

func (t *timeTicker) Stop() {
Expand All @@ -393,6 +429,10 @@ func (t *timeTicker) Stop() {
if t.ticker != nil {
t.ticker.Stop()
}
if t.alignChan != nil {
close(t.stopping)
}
t.wg.Wait()
}

func (t *timeTicker) Next(now time.Time) time.Time {
Expand Down
70 changes: 43 additions & 27 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ import (
"log"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick"
)

const (
statsEvalErrors = "eval_errors"
)

type EvalNode struct {
node
e *pipeline.EvalNode
expressions []*tick.StatefulExpr
evalErrors *expvar.Int
}

// Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point.
Expand All @@ -36,15 +42,13 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
}

func (e *EvalNode) runEval(snapshot []byte) error {
e.evalErrors = &expvar.Int{}
e.statMap.Set(statsEvalErrors, e.evalErrors)
switch e.Provides() {
case pipeline.StreamEdge:
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
e.timer.Start()
fields, err := e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
return err
}
p.Fields = fields
p.Fields = e.eval(p.Time, p.Fields, p.Tags)
e.timer.Stop()
for _, child := range e.outs {
err := child.CollectPoint(p)
Expand All @@ -57,11 +61,7 @@ func (e *EvalNode) runEval(snapshot []byte) error {
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
e.timer.Start()
for i, p := range b.Points {
fields, err := e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
return err
}
b.Points[i].Fields = fields
b.Points[i].Fields = e.eval(p.Time, p.Fields, p.Tags)
}
e.timer.Stop()
for _, child := range e.outs {
Expand All @@ -75,27 +75,35 @@ func (e *EvalNode) runEval(snapshot []byte) error {
return nil
}

func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) (models.Fields, error) {
func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) models.Fields {
vars, err := mergeFieldsAndTags(now, fields, tags)
if err != nil {
return nil, err
e.logger.Println("E!", err)
return nil
}
for i, expr := range e.expressions {
v, err := expr.EvalNum(vars)
if err != nil {
return nil, err
if v, err := expr.EvalNum(vars); err == nil {
name := e.e.AsList[i]
vars.Set(name, v)
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
}
name := e.e.AsList[i]
vars.Set(name, v)
}
var newFields models.Fields
if e.e.KeepFlag {
if l := len(e.e.KeepList); l != 0 {
newFields = make(models.Fields, l)
for _, f := range e.e.KeepList {
newFields[f], err = vars.Get(f)
if err != nil {
return nil, err
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
}
}
} else {
Expand All @@ -104,20 +112,28 @@ func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]str
newFields[f] = v
}
for _, f := range e.e.AsList {
newFields[f], err = vars.Get(f)
if err != nil {
return nil, err
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
}
}
}
} else {
newFields = make(models.Fields, len(e.e.AsList))
for _, f := range e.e.AsList {
newFields[f], err = vars.Get(f)
if err != nil {
return nil, err
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
}
}
}
return newFields, nil
return newFields
}
19 changes: 19 additions & 0 deletions expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"expvar"
"fmt"
"math"
"sort"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -174,6 +175,24 @@ func (v *Map) Do(f func(expvar.KeyValue)) {
v.doLocked(f)
}

// DoSorted calls f for each entry in the map in sorted order.
// The map is locked during the iteration,
// but existing entries may be concurrently updated.
func (v *Map) DoSorted(f func(expvar.KeyValue)) {
v.mu.RLock()
defer v.mu.RUnlock()
keys := make([]string, len(v.m))
i := 0
for key := range v.m {
keys[i] = key
i++
}
sort.Strings(keys)
for _, k := range keys {
f(expvar.KeyValue{k, v.m[k]})
}
}

// doLocked calls f for each entry in the map.
// v.mu must be held for reads.
func (v *Map) doLocked(f func(expvar.KeyValue)) {
Expand Down
Loading