Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shift node so values from different times can be joined #231

Merged
merged 1 commit into from
Feb 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ kapacitor*.zip
*.pyc
*.test
/test-logs
*.prof
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### Release Notes

### Features
- [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons.


### Bugfixes
- [#199](https://github.com/influxdata/kapacitor/issues/199): BREAKING: Various fixes for the Alerta integration.
Expand Down
78 changes: 78 additions & 0 deletions integrations/data/TestStream_Shift.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
dbname
rpname
cpu,type=idle,host=serverA value=97.1 0000000001
dbname
rpname
cpu,type=idle,host=serverB value=97.1 0000000001
dbname
rpname
disk,type=sda,host=serverB value=39 0000000001
dbname
rpname
cpu,type=idle,host=serverB value=92.6 0000000002
dbname
rpname
cpu,type=idle,host=serverA value=95.6 0000000003
dbname
rpname
cpu,type=idle,host=serverB value=95.6 0000000003
dbname
rpname
cpu,type=idle,host=serverA value=93.1 0000000004
dbname
rpname
cpu,type=idle,host=serverB value=93.1 0000000004
dbname
rpname
cpu,type=idle,host=serverA value=92.6 0000000005
dbname
rpname
cpu,type=idle,host=serverB value=92.6 0000000005
dbname
rpname
cpu,type=idle,host=serverA value=95.8 0000000006
dbname
rpname
cpu,type=idle,host=serverB value=95.8 0000000006
dbname
rpname
cpu,type=idle,host=serverC value=95.8 0000000006
dbname
rpname
cpu,type=idle,host=serverA value=92.7 0000000007
dbname
rpname
cpu,type=idle,host=serverB value=92.7 0000000007
dbname
rpname
cpu,type=idle,host=serverA value=96.0 0000000008
dbname
rpname
cpu,type=idle,host=serverB value=96.0 0000000008
dbname
rpname
cpu,type=idle,host=serverA value=93.4 0000000009
dbname
rpname
cpu,type=idle,host=serverB value=93.4 0000000009
dbname
rpname
disk,type=sda,host=serverB value=423 0000000009
dbname
rpname
cpu,type=idle,host=serverA value=95.3 0000000010
dbname
rpname
cpu,type=idle,host=serverB value=95.3 0000000010
dbname
rpname
cpu,type=idle,host=serverA value=96.4 0000000011
dbname
rpname
cpu,type=idle,host=serverB value=96.4 0000000011
dbname
rpname
cpu,type=idle,host=serverA value=95.1 0000000012
dbname
rpname
cpu,type=idle,host=serverB value=95.1 0000000012
204 changes: 204 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,210 @@ stream
testStreamerWithOutput(t, "TestStream_Window", script, 13*time.Second, er, nil, false)
}

func TestStream_Shift(t *testing.T) {

var script = `
var period = 5s

var data = stream
.from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')

var past = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))
.shift(period)

var current = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))

past.join(current)
.as('past', 'current')
.eval(lambda: "current.count" - "past.count")
.keep()
.as('diff')
.httpOut('TestStream_Shift')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "current.count", "diff", "past.count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
5.0,
1.0,
4.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false)
}

func TestStream_ShiftBatch(t *testing.T) {

var script = `
var period = 5s

var data = stream
.from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')

var past = data
.window()
.period(period)
.every(period)
.align()
.shift(period)
.mapReduce(influxql.count('value'))

var current = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))

past.join(current)
.as('past', 'current')
.eval(lambda: "current.count" - "past.count")
.keep()
.as('diff')
.httpOut('TestStream_Shift')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "current.count", "diff", "past.count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
5.0,
1.0,
4.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false)
}

func TestStream_ShiftNegative(t *testing.T) {

var script = `
var period = 5s

var data = stream
.from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')

var past = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))

var current = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))
.shift(-period)

past.join(current)
.as('past', 'current')
.eval(lambda: "current.count" - "past.count")
.keep()
.as('diff')
.httpOut('TestStream_Shift')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "current.count", "diff", "past.count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
5.0,
1.0,
4.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false)
}

func TestStream_ShiftBatchNegative(t *testing.T) {

var script = `
var period = 5s

var data = stream
.from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')

var past = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))

var current = data
.window()
.period(period)
.every(period)
.align()
.shift(-period)
.mapReduce(influxql.count('value'))

past.join(current)
.as('past', 'current')
.eval(lambda: "current.count" - "past.count")
.keep()
.as('diff')
.httpOut('TestStream_Shift')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "current.count", "diff", "past.count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
5.0,
1.0,
4.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false)
}

func TestStream_SimpleMR(t *testing.T) {

var script = `
Expand Down
7 changes: 7 additions & 0 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,10 @@ func (n *chainnode) Derivative(field string) *DerivativeNode {
n.linkChild(s)
return s
}

// Create a new node that shifts the incoming points or batches in time.
func (n *chainnode) Shift(shift time.Duration) *ShiftNode {
s := newShiftNode(n.Provides(), shift)
n.linkChild(s)
return s
}
34 changes: 34 additions & 0 deletions pipeline/shift.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package pipeline

import (
"time"
)

// Shift points and batches in time, this is useful for comparing
// batches or points from different times.
//
// Example:
// stream
// .shift(5m)
//
// Shift all data points 5m forward in time.
//
// Example:
// stream
// .shift(-10s)
//
// Shift all data points 10s backward in time.
type ShiftNode struct {
chainnode

// Keep one point or batch every Duration
// tick:ignore
Shift time.Duration
}

func newShiftNode(wants EdgeType, shift time.Duration) *ShiftNode {
return &ShiftNode{
chainnode: newBasicChainNode("shift", wants, wants),
Shift: shift,
}
}
Loading