Skip to content

Commit

Permalink
Add delimit option to JoinNode (#698)
Browse files Browse the repository at this point in the history
* Add delimit option to JoinNode

* CHANGELOG.md
  • Loading branch information
Nathaniel Cook authored Jul 5, 2016
1 parent 9f9b98a commit bf51dbd
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- [#669](https://github.com/influxdata/kapacitor/pull/669): Add size function for humanize byte size. thanks @jsvisa!
- [#697](https://github.com/influxdata/kapacitor/pull/697): Can now flatten a set of points into a single points creating dynamcially named fields.

- [#698](https://github.com/influxdata/kapacitor/pull/698): Join delimiter can be specified.

### Bugfixes

Expand Down
100 changes: 100 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,106 @@ cpu0

testBatcherWithOutput(t, "TestBatch_Join", script, 30*time.Second, er, false)
}
func TestBatch_Join_Delimiter(t *testing.T) {

var script = `
var cpu0 = batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "cpu" = 'cpu0'
''')
.period(10s)
.every(10s)
.groupBy(time(2s))
var cpu1 = batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "cpu" = 'cpu1'
''')
.period(10s)
.every(10s)
.groupBy(time(2s))
cpu0
|join(cpu1)
.as('cpu0', 'cpu1')
.delimiter('~')
|count('cpu0~mean')
|window()
.period(20s)
.every(20s)
|sum('count')
|httpOut('TestBatch_Join')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu_usage_idle",
Columns: []string{"time", "sum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
10.0,
}},
},
},
}

testBatcherWithOutput(t, "TestBatch_Join", script, 30*time.Second, er, false)
}
func TestBatch_Join_DelimiterEmpty(t *testing.T) {

var script = `
var cpu0 = batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "cpu" = 'cpu0'
''')
.period(10s)
.every(10s)
.groupBy(time(2s))
var cpu1 = batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "cpu" = 'cpu1'
''')
.period(10s)
.every(10s)
.groupBy(time(2s))
cpu0
|join(cpu1)
.as('cpu0', 'cpu1')
.delimiter('')
|count('cpu0mean')
|window()
.period(20s)
.every(20s)
|sum('count')
|httpOut('TestBatch_Join')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu_usage_idle",
Columns: []string{"time", "sum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
10.0,
}},
},
},
}

testBatcherWithOutput(t, "TestBatch_Join", script, 30*time.Second, er, false)
}

func TestBatch_JoinTolerance(t *testing.T) {

Expand Down
149 changes: 149 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,155 @@ errorCounts
testStreamerWithOutput(t, "TestStream_Join", script, 13*time.Second, er, nil, true)
}

func TestStream_Join_Delimiter(t *testing.T) {

var script = `
var errorCounts = stream
|from()
.measurement('errors')
.groupBy('service')
|window()
.period(10s)
.every(10s)
.align()
|sum('value')
var viewCounts = stream
|from()
.measurement('views')
.groupBy('service')
|window()
.period(10s)
.every(10s)
.align()
|sum('value')
errorCounts
|join(viewCounts)
.as('errors', 'views')
.delimiter('#')
.streamName('error_view')
|eval(lambda: "errors#sum" / "views#sum")
.as('error_percent')
.keep()
|httpOut('TestStream_Join')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "error_view",
Tags: map[string]string{"service": "cartA"},
Columns: []string{"time", "error_percent", "errors#sum", "views#sum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.01,
47.0,
4700.0,
}},
},
{
Name: "error_view",
Tags: map[string]string{"service": "login"},
Columns: []string{"time", "error_percent", "errors#sum", "views#sum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.01,
45.0,
4500.0,
}},
},
{
Name: "error_view",
Tags: map[string]string{"service": "front"},
Columns: []string{"time", "error_percent", "errors#sum", "views#sum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.01,
32.0,
3200.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Join", script, 13*time.Second, er, nil, true)
}
func TestStream_Join_DelimiterEmpty(t *testing.T) {

var script = `
var errorCounts = stream
|from()
.measurement('errors')
.groupBy('service')
|window()
.period(10s)
.every(10s)
.align()
|sum('value')
var viewCounts = stream
|from()
.measurement('views')
.groupBy('service')
|window()
.period(10s)
.every(10s)
.align()
|sum('value')
errorCounts
|join(viewCounts)
.as('errors', 'views')
.delimiter('')
.streamName('error_view')
|eval(lambda: "errorssum" / "viewssum")
.as('error_percent')
.keep()
|httpOut('TestStream_Join')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "error_view",
Tags: map[string]string{"service": "cartA"},
Columns: []string{"time", "error_percent", "errorssum", "viewssum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.01,
47.0,
4700.0,
}},
},
{
Name: "error_view",
Tags: map[string]string{"service": "login"},
Columns: []string{"time", "error_percent", "errorssum", "viewssum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.01,
45.0,
4500.0,
}},
},
{
Name: "error_view",
Tags: map[string]string{"service": "front"},
Columns: []string{"time", "error_percent", "errorssum", "viewssum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.01,
32.0,
3200.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Join", script, 13*time.Second, er, nil, true)
}

func TestStream_JoinTolerance(t *testing.T) {

var script = `
Expand Down
17 changes: 11 additions & 6 deletions join.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (g *group) collect(i int, p models.PointInterface) error {
g.j.fill,
g.j.fillValue,
g.j.j.Names,
g.j.j.Delimiter,
g.j.j.Tolerance,
t,
g.j.logger,
Expand All @@ -296,6 +297,7 @@ func (g *group) collect(i int, p models.PointInterface) error {
g.j.fill,
g.j.fillValue,
g.j.j.Names,
g.j.j.Delimiter,
g.j.j.Tolerance,
t,
g.j.logger,
Expand Down Expand Up @@ -399,6 +401,7 @@ type joinset struct {
fill influxql.FillOption
fillValue interface{}
prefixes []string
delimiter string

time time.Time
tolerance time.Duration
Expand All @@ -418,6 +421,7 @@ func newJoinset(
fill influxql.FillOption,
fillValue interface{},
prefixes []string,
delimiter string,
tolerance time.Duration,
time time.Time,
l *log.Logger,
Expand All @@ -428,6 +432,7 @@ func newJoinset(
fill: fill,
fillValue: fillValue,
prefixes: prefixes,
delimiter: delimiter,
expected: expected,
values: make([]models.PointInterface, expected),
first: expected,
Expand Down Expand Up @@ -467,19 +472,19 @@ func (js *joinset) JoinIntoPoint() (models.Point, bool) {
switch js.fill {
case influxql.NullFill:
for k := range js.First().PointFields() {
fields[js.prefixes[i]+"."+k] = nil
fields[js.prefixes[i]+js.delimiter+k] = nil
}
case influxql.NumberFill:
for k := range js.First().PointFields() {
fields[js.prefixes[i]+"."+k] = js.fillValue
fields[js.prefixes[i]+js.delimiter+k] = js.fillValue
}
default:
// inner join no valid point possible
return models.Point{}, false
}
} else {
for k, v := range p.PointFields() {
fields[js.prefixes[i]+"."+k] = v
fields[js.prefixes[i]+js.delimiter+k] = v
}
}
}
Expand Down Expand Up @@ -558,19 +563,19 @@ func (js *joinset) JoinIntoBatch() (models.Batch, bool) {
switch js.fill {
case influxql.NullFill:
for _, k := range fieldNames {
fields[js.prefixes[i]+"."+k] = nil
fields[js.prefixes[i]+js.delimiter+k] = nil
}
case influxql.NumberFill:
for _, k := range fieldNames {
fields[js.prefixes[i]+"."+k] = js.fillValue
fields[js.prefixes[i]+js.delimiter+k] = js.fillValue
}
default:
// inner join no valid point possible
return models.Batch{}, false
}
} else {
for k, v := range bp.Fields {
fields[js.prefixes[i]+"."+k] = v
fields[js.prefixes[i]+js.delimiter+k] = v
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions pipeline/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"time"
)

const (
defaultJoinDelimiter = "."
)

// Joins the data from any number of nodes.
// As each data point is received from a parent node it is paired
// with the next data points from the other parent nodes with a
Expand Down Expand Up @@ -59,6 +63,10 @@ type JoinNode struct {
// tick:ignore
Dimensions []string `tick:"On"`

// The delimiter for the field name prefixes.
// Can be the empty string.
Delimiter string

// The name of this new joined data stream.
// If empty the name of the left parent is used.
StreamName string
Expand All @@ -82,6 +90,7 @@ type JoinNode struct {
func newJoinNode(e EdgeType, parents []Node) *JoinNode {
j := &JoinNode{
chainnode: newBasicChainNode("join", e, e),
Delimiter: defaultJoinDelimiter,
}
for _, n := range parents {
n.linkChild(j)
Expand Down Expand Up @@ -145,8 +154,8 @@ func (j *JoinNode) validate() error {
if len(name) == 0 {
return fmt.Errorf("must provide a prefix name for the join node, see .as() property method")
}
if strings.ContainsRune(name, '.') {
return fmt.Errorf("cannot use name %s as field prefix, it contains a '.' character", name)
if j.Delimiter != "" && strings.Contains(name, j.Delimiter) {
return fmt.Errorf("cannot use name %s as field prefix, it contains the delimiter %q ", name, j.Delimiter)
}
}
names := make(map[string]bool, len(j.Names))
Expand Down

0 comments on commit bf51dbd

Please sign in to comment.