Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chain methods where and groupBy back on stream node #451

Merged
merged 2 commits into from
Apr 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Example UDF config for a socket based UDF.
- [#386](https://github.com/influxdata/kapacitor/issues/386): Adds official Go HTTP client package.
- [#399](https://github.com/influxdata/kapacitor/issues/399): Allow disabling of subscriptions.
- [#417](https://github.com/influxdata/kapacitor/issues/417): UDFs can be connected over a Unix socket. This enables UDFs from across Docker containers.
- [#451](https://github.com/influxdata/kapacitor/issues/451): StreamNode supports `|groupBy` and `|where` methods.

### Bugfixes

Expand Down
111 changes: 111 additions & 0 deletions integrations/data/TestStream_GroupByWhere.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
dbname
rpname
cpu,cpu=cpu0,host=serverA value=97.1 0000000001
dbname
rpname
cpu,cpu=cpu0,host=serverB value=67.1 0000000001
dbname
rpname
cpu,cpu=cpu1,host=serverA value=87.1 0000000001
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=97.1 0000000001
dbname
rpname
cpu,cpu=cpu0,host=serverA value=91.6 0000000002
dbname
rpname
cpu,cpu=cpu1,host=serverA value=82.6 0000000002
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=92.6 0000000002
dbname
rpname
cpu,cpu=cpu0,host=serverA value=35.6 0000000003
dbname
rpname
cpu,cpu=cpu1,host=serverA value=85.6 0000000003
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=95.6 0000000003
dbname
rpname
cpu,cpu=cpu0,host=serverA value=73.1 0000000004
dbname
rpname
cpu,cpu=cpu1,host=serverA value=63.1 0000000004
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=93.1 0000000004
dbname
rpname
cpu,cpu=cpu0,host=serverA value=32.6 0000000005
dbname
rpname
cpu,cpu=cpu1,host=serverA value=72.6 0000000005
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=92.6 0000000005
dbname
rpname
cpu,cpu=cpu0,host=serverA value=94.8 0000000006
dbname
rpname
cpu,cpu=cpu1,host=serverA value=25.8 0000000006
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=95.8 0000000006
dbname
rpname
cpu,cpu=cpu0,host=serverC value=15.8 0000000007
dbname
rpname
cpu,cpu=cpu1,host=serverA value=82.7 0000000007
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=92.7 0000000007
dbname
rpname
cpu,cpu=cpu0,host=serverA value=82.7 0000000008
dbname
rpname
cpu,cpu=cpu1,host=serverA value=66.0 0000000008
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=96.0 0000000008
dbname
rpname
cpu,cpu=cpu0,host=serverA value=86.0 0000000009
dbname
rpname
cpu,cpu=cpu1,host=serverA value=73.4 0000000009
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=93.4 0000000009
dbname
rpname
cpu,cpu=cpu0,host=serverA value=73.4 0000000010
dbname
rpname
cpu,cpu=cpu1,host=serverA value=85.3 0000000010
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=95.3 0000000010
dbname
rpname
cpu,cpu=cpu0,host=serverA value=65.3 0000000011
dbname
rpname
cpu,cpu=cpu1,host=serverA value=93.4 0000000011
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=96.4 0000000011
dbname
rpname
cpu,cpu=cpu0,host=serverA value=15.1 0000000012
dbname
rpname
cpu,cpu=cpu1,host=serverA value=95.1 0000000012
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=95.1 0000000012
63 changes: 63 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,69 @@ stream
testStreamerWithOutput(t, "TestStream_GroupBy", script, 13*time.Second, er, nil, false)
}

func TestStream_GroupByWhere(t *testing.T) {

var script = `
var serverA = stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')

var byCpu = serverA
|groupBy('host', 'cpu')

var total = serverA
|where(lambda: "cpu" == 'cpu-total')

byCpu
|join(total)
.on('host')
.as('cpu', 'total')
|eval(lambda: "cpu.value" / "total.value")
.as('cpu_percent')
|window()
.period(10s)
.every(10s)
|mean('cpu_percent')
|httpOut('TestStream_GroupByWhere')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: map[string]string{"cpu": "cpu0", "host": "serverA"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.7823116704593873,
}},
},
{
Name: "cpu",
Tags: map[string]string{"cpu": "cpu1", "host": "serverA"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.7676074281820646,
}},
},
{
Name: "cpu",
Tags: map[string]string{"cpu": "cpu-total", "host": "serverA"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
1.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_GroupByWhere", script, 13*time.Second, er, nil, false)
}

func TestStream_Join(t *testing.T) {

var script = `
Expand Down
59 changes: 58 additions & 1 deletion pipeline/stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pipeline

import (
"fmt"
"time"

"github.com/influxdata/kapacitor/tick"
Expand Down Expand Up @@ -72,6 +73,10 @@ func (s *SourceStreamNode) From() *StreamNode {
// the tag `host` matches the regex `logger\d+`
type StreamNode struct {
chainnode

// self describer
describer *tick.ReflectionDescriber

// An expression to filter the data stream.
// tick:ignore
Expression tick.Node `tick:"Where"`
Expand Down Expand Up @@ -105,9 +110,12 @@ type StreamNode struct {
}

func newStreamNode() *StreamNode {
return &StreamNode{
s := &StreamNode{
chainnode: newBasicChainNode("stream", StreamEdge, StreamEdge),
}
s.describer, _ = tick.NewReflectionDescriber(s)
return s

}

// Creates a new stream node that can be further
Expand Down Expand Up @@ -225,3 +233,52 @@ func (s *StreamNode) GroupBy(tag ...interface{}) *StreamNode {
s.Dimensions = tag
return s
}

// Tick Describer methods

//tick:ignore
func (s *StreamNode) Desc() string {
return s.describer.Desc()
}

//tick:ignore
func (s *StreamNode) HasChainMethod(name string) bool {
if name == "groupBy" || name == "where" {
return true
}
return s.describer.HasChainMethod(name)
}

//tick:ignore
func (s *StreamNode) CallChainMethod(name string, args ...interface{}) (interface{}, error) {
switch name {
case "groupBy":
return s.chainnode.GroupBy(args...), nil
case "where":
if len(args) != 1 {
return nil, fmt.Errorf("invalid number of args to |where() got %d exp 1", len(args))
}
expr, ok := args[0].(tick.Node)
if !ok {
return nil, fmt.Errorf("invalid arg to |where() got %T exp tick.Node", args[0])
}
return s.chainnode.Where(expr), nil
default:
return s.describer.CallChainMethod(name, args...)
}
}

//tick:ignore
func (s *StreamNode) HasProperty(name string) bool {
return s.describer.HasProperty(name)
}

//tick:ignore
func (s *StreamNode) Property(name string) interface{} {
return s.describer.Property(name)
}

//tick:ignore
func (s *StreamNode) SetProperty(name string, args ...interface{}) (interface{}, error) {
return s.describer.SetProperty(name, args...)
}