Skip to content

Commit

Permalink
Merge pull request #248 from influxdata/nc-issue#215
Browse files Browse the repository at this point in the history
Add performance metrics to tasks and nodes
  • Loading branch information
Nathaniel Cook committed Feb 24, 2016
2 parents 4be3e10 + e88b6c4 commit ad2c993
Show file tree
Hide file tree
Showing 43 changed files with 1,522 additions and 387 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,22 @@

### Release Notes

Kapacitor now exposes more internal metrics for determining the performance of a given task.
The internal statistics includes a new measurement named `node` that contains any stats a node provides, tagged by the task, node, task type and kind of node (i.e. window vs union).
All nodes provide an averaged execution time for the node.
These stats are also available in the DOT output of the Kapacitor show command.

Significant performance improvements have also been added.
In some cases Kapacitor throughput has improved by 4X.


### Features
- [#236](https://github.com/influxdata/kapacitor/issues/236): Implement batched group by
- [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons.
- [#190](https://github.com/influxdata/kapacitor/issues/190): BREAKING: Deadman's switch now triggers off emitted counts and is grouped by to original grouping of the data.
The breaking change is that the 'collected' stat is no longer output for `.stats` and has been replaced by `emitted`.
- [#145](https://github.com/influxdata/kapacitor/issues/145): The InfluxDB Out Node now writes data to InfluxDB in buffers.
- [#215](https://github.com/influxdata/kapacitor/issues/215): Add performance metrics to nodes for average execution times and node throughput values.


### Bugfixes
Expand Down
5 changes: 5 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,11 @@ func (a *AlertNode) runAlert([]byte) error {
switch a.Wants() {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
a.timer.Start()
l := a.determineLevel(p.Time, p.Fields, p.Tags)
state := a.updateState(l, p.Group)
if (a.a.UseFlapping && state.flapping) || (a.a.IsStateChangesOnly && !state.changed) {
a.timer.Stop()
continue
}
// send alert if we are not OK or we are OK and state changed (i.e recovery)
Expand All @@ -293,9 +295,11 @@ func (a *AlertNode) runAlert([]byte) error {
h(ad)
}
}
a.timer.Stop()
}
case pipeline.BatchEdge:
for b, ok := a.ins[0].NextBatch(); ok; b, ok = a.ins[0].NextBatch() {
a.timer.Start()
triggered := false
for _, p := range b.Points {
l := a.determineLevel(p.Time, p.Fields, p.Tags)
Expand Down Expand Up @@ -331,6 +335,7 @@ func (a *AlertNode) runAlert([]byte) error {
}
}
}
a.timer.Stop()
}
}
return nil
Expand Down
18 changes: 14 additions & 4 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/gorhill/cronexpr"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdb/influxdb/client"
client "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/influxdb/influxql"
)

Expand Down Expand Up @@ -96,7 +96,13 @@ func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string {

// Do not add the source batch node to the dot output
// since its not really an edge.
func (s *SourceBatchNode) edot(buf *bytes.Buffer) {
func (s *SourceBatchNode) edot(*bytes.Buffer, bool) {}

func (s *SourceBatchNode) collectedCount() (count int64) {
for _, child := range s.children {
count += child.collectedCount()
}
return
}

type BatchNode struct {
Expand Down Expand Up @@ -244,6 +250,7 @@ func (b *BatchNode) doQuery() error {
case <-b.aborting:
return errors.New("batch doQuery aborted")
case now := <-tickC:
b.timer.Start()

// Update times for query
stop := now.Add(-1 * b.b.Offset)
Expand All @@ -267,8 +274,8 @@ func (b *BatchNode) doQuery() error {
return err
}

if resp.Err != nil {
return resp.Err
if err := resp.Error(); err != nil {
return err
}

// Collect batches
Expand All @@ -277,10 +284,13 @@ func (b *BatchNode) doQuery() error {
if err != nil {
return err
}
b.timer.Pause()
for _, bch := range batches {
b.ins[0].CollectBatch(bch)
}
b.timer.Resume()
}
b.timer.Stop()
}
}
}
Expand Down
33 changes: 6 additions & 27 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package run

import (
"errors"
"expvar"
"fmt"
"io/ioutil"
"log"
Expand Down Expand Up @@ -45,27 +44,6 @@ import (
const clusterIDFilename = "cluster.id"
const serverIDFilename = "server.id"

var (
//Published vars
cidVar = &expvar.String{}

sidVar = &expvar.String{}

hostVar = &expvar.String{}

productVar = &expvar.String{}

versionVar = &expvar.String{}
)

func init() {
expvar.Publish(kapacitor.ClusterIDVarName, cidVar)
expvar.Publish(kapacitor.ServerIDVarName, sidVar)
expvar.Publish(kapacitor.HostVarName, hostVar)
expvar.Publish(kapacitor.ProductVarName, productVar)
expvar.Publish(kapacitor.VersionVarName, versionVar)
}

// BuildInfo represents the build details for the server code.
type BuildInfo struct {
Version string
Expand Down Expand Up @@ -377,6 +355,7 @@ func (s *Server) appendStatsService(c stats.Config) {
srv := stats.NewService(c, l)
srv.TaskMaster = s.TaskMaster

s.TaskMaster.TimingService = srv
s.Services = append(s.Services, srv)
}
}
Expand Down Expand Up @@ -413,11 +392,11 @@ func (s *Server) Open() error {
}

// Set published vars
cidVar.Set(s.ClusterID)
sidVar.Set(s.ServerID)
hostVar.Set(s.hostname)
productVar.Set(kapacitor.Product)
versionVar.Set(s.buildInfo.Version)
kapacitor.ClusterIDVar.Set(s.ClusterID)
kapacitor.ServerIDVar.Set(s.ServerID)
kapacitor.HostVar.Set(s.hostname)
kapacitor.ProductVar.Set(kapacitor.Product)
kapacitor.VersionVar.Set(s.buildInfo.Version)
s.Logger.Printf("I! ClusterID: %s ServerID: %s", s.ClusterID, s.ServerID)

// Start profiling, if set.
Expand Down
6 changes: 5 additions & 1 deletion cmd/kapacitord/run/server_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/influxdata/kapacitor/cmd/kapacitord/run"
"github.com/influxdata/kapacitor/services/task_store"
"github.com/influxdata/kapacitor/wlog"
"github.com/influxdb/influxdb/client"
client "github.com/influxdb/influxdb/client/v2"
)

// Server represents a test wrapper for run.Server.
Expand Down Expand Up @@ -382,6 +382,10 @@ type InfluxDB struct {

func NewInfluxDB(callback queryFunc) *InfluxDB {
handler := func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/ping" {
w.WriteHeader(http.StatusNoContent)
return
}
q := r.URL.Query().Get("q")
res := callback(q)
enc := json.NewEncoder(w)
Expand Down
72 changes: 63 additions & 9 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"net/url"
"os"
"os/exec"
"path"
"path/filepath"
"reflect"
Expand All @@ -20,7 +21,7 @@ import (
"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/cmd/kapacitord/run"
"github.com/influxdata/kapacitor/services/udf"
"github.com/influxdb/influxdb/client"
client "github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/toml"
Expand Down Expand Up @@ -162,9 +163,16 @@ func TestServer_EnableTask(t *testing.T) {
if ti.TICKscript != tick {
t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick)
}
dot := "digraph testTaskName {\nsrcstream0 -> stream1 [label=\"0\"];\n}"
dot := `digraph testTaskName {
graph [throughput="0.00 points/s"];
srcstream0 [avg_exec_time="0" ];
srcstream0 -> stream1 [processed="0"];
stream1 [avg_exec_time="0" ];
}`
if ti.Dot != dot {
t.Fatalf("unexpected dot got %s exp %s", ti.Dot, dot)
t.Fatalf("unexpected dot got\n%s exp\n%s", ti.Dot, dot)
}
}

Expand Down Expand Up @@ -746,21 +754,44 @@ batch

func TestServer_UDFStreamAgents(t *testing.T) {
dir, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
udfDir := filepath.Clean(filepath.Join(dir, "../../../udf"))

tdir, err := ioutil.TempDir("", "kapacitor_server_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tdir)

agents := []struct {
buildFunc func() error
config udf.FunctionConfig
}{
// Go
{
buildFunc: func() error { return nil },
buildFunc: func() error {
// Explicitly compile the binary.
// We could just use 'go run' but I ran into race conditions
// where 'go run' was not handing off to the compiled process in time
// and I didn't care to dig into 'go run's specific behavior.
cmd := exec.Command(
"go",
"build",
"-o",
filepath.Join(tdir, "movavg"),
filepath.Join(udfDir, "agent/examples/moving_avg/moving_avg.go"),
)
out, err := cmd.CombinedOutput()
if err != nil {
t.Log(string(out))
return err
}
return nil
},
config: udf.FunctionConfig{
Prog: "go",
Args: []string{"run", filepath.Join(udfDir, "agent/examples/moving_avg/moving_avg.go")},
Prog: filepath.Join(tdir, "movavg"),
Timeout: toml.Duration(time.Minute),
},
},
Expand Down Expand Up @@ -885,21 +916,44 @@ test,group=b value=0 0000000011

func TestServer_UDFBatchAgents(t *testing.T) {
dir, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
udfDir := filepath.Clean(filepath.Join(dir, "../../../udf"))

tdir, err := ioutil.TempDir("", "kapacitor_server_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tdir)

agents := []struct {
buildFunc func() error
config udf.FunctionConfig
}{
// Go
{
buildFunc: func() error { return nil },
buildFunc: func() error {
// Explicitly compile the binary.
// We could just use 'go run' but I ran into race conditions
// where 'go run' was not handing off to the compiled process in time
// and I didn't care to dig into 'go run's specific behavior.
cmd := exec.Command(
"go",
"build",
"-o",
filepath.Join(tdir, "outliers"),
filepath.Join(udfDir, "agent/examples/outliers/outliers.go"),
)
out, err := cmd.CombinedOutput()
if err != nil {
t.Log(string(out))
return err
}
return nil
},
config: udf.FunctionConfig{
Prog: "go",
Args: []string{"run", filepath.Join(udfDir, "agent/examples/outliers/outliers.go")},
Prog: filepath.Join(tdir, "outliers"),
Timeout: toml.Duration(time.Minute),
},
},
Expand Down
6 changes: 6 additions & 0 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (d *DerivativeNode) runDerivative([]byte) error {
case pipeline.StreamEdge:
previous := make(map[models.GroupID]models.Point)
for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() {
d.timer.Start()
pr, ok := previous[p.Group]
if !ok {
previous[p.Group] = p
Expand All @@ -40,17 +41,21 @@ func (d *DerivativeNode) runDerivative([]byte) error {
fields := pr.Fields.Copy()
fields[d.d.As] = value
pr.Fields = fields
d.timer.Pause()
for _, child := range d.outs {
err := child.CollectPoint(pr)
if err != nil {
return err
}
}
d.timer.Resume()
}
previous[p.Group] = p
d.timer.Stop()
}
case pipeline.BatchEdge:
for b, ok := d.ins[0].NextBatch(); ok; b, ok = d.ins[0].NextBatch() {
d.timer.Start()
if len(b.Points) > 0 {
pr := b.Points[0]
var p models.BatchPoint
Expand All @@ -69,6 +74,7 @@ func (d *DerivativeNode) runDerivative([]byte) error {
}
b.Points = b.Points[:len(b.Points)-1]
}
d.timer.Stop()
for _, child := range d.outs {
err := child.CollectBatch(b)
if err != nil {
Expand Down
Loading

0 comments on commit ad2c993

Please sign in to comment.