diff --git a/CHANGELOG.md b/CHANGELOG.md index f8b47feb1..10828e324 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,31 @@ ### Release Notes +Kapacitor is now using the functions from the new query engine in InfluxDB core. +Along with this change is a change in the TICKscript API so that using the InfluxQL functions is easier. +Simply call the desired method directly no need to call `.mapReduce` explicitly. +This change now hides the mapReduce aspect and handles it internally. +Using `.mapReduce` is officially deprecated in this release and will be remove in the next major release. +We feel that this change improves the readability of TICKscripts and exposes less implementation details +to the end user. +Updating your exising TICKscripts is simple. +If previously you had code like this: + +```javascript +stream.from()... + .window()... + .mapReduce(influxql.count('value')) +``` +then update it to look like this: + +```javascript +stream.from()... + .window()... + .count('value') +``` + +a simple regex could fix all your existing scripts. + Kapacitor now exposes more internal metrics for determining the performance of a given task. The internal statistics includes a new measurement named `node` that contains any stats a node provides, tagged by the task, node, task type and kind of node (i.e. window vs union). All nodes provide an averaged execution time for the node. @@ -24,6 +49,7 @@ With #144 you can now join streams with differing group by dimensions. - [#145](https://github.com/influxdata/kapacitor/issues/145): The InfluxDB Out Node now writes data to InfluxDB in buffers. - [#215](https://github.com/influxdata/kapacitor/issues/215): Add performance metrics to nodes for average execution times and node throughput values. - [#144](https://github.com/influxdata/kapacitor/issues/144): Can now join streams with differing dimensions using the join.On property. +- [#249](https://github.com/influxdata/kapacitor/issues/249): Can now use InfluxQL functions directly instead of via the MapReduce method. Example `stream.from().count()`. ### Bugfixes diff --git a/build.py b/build.py index ca10afa0a..6b1c841f9 100755 --- a/build.py +++ b/build.py @@ -119,6 +119,7 @@ def package_scripts(build_root): def run_generate(): print "Running generate..." run("go get github.com/gogo/protobuf/protoc-gen-gogo") + run("go get github.com/benbjohnson/tmpl") run("go generate ./...") print "Generate succeeded." return True diff --git a/functions.go b/functions.go index e0dee41e5..4806da03a 100644 --- a/functions.go +++ b/functions.go @@ -78,22 +78,22 @@ func (influxqlMapReducers) Percentile(field string, p float64) pipeline.MapReduc return mr(field, "percentile", pipeline.StreamEdge, tsdb.MapEcho, r) } -func (influxqlMapReducers) Top(limit int64, field string, fieldsOrTags ...string) pipeline.MapReduceInfo { +func (influxqlMapReducers) Top(limit int64, field string, fieldsAndTags ...string) pipeline.MapReduceInfo { m := func(in *tsdb.MapInput) interface{} { - return tsdb.MapTopBottom(in, int(limit), fieldsOrTags, len(fieldsOrTags)+2, "top") + return tsdb.MapTopBottom(in, int(limit), fieldsAndTags, len(fieldsAndTags)+2, "top") } r := func(values []interface{}) interface{} { - return tsdb.ReduceTopBottom(values, int(limit), fieldsOrTags, "top") + return tsdb.ReduceTopBottom(values, int(limit), fieldsAndTags, "top") } return mr(field, "top", pipeline.BatchEdge, m, r) } -func (influxqlMapReducers) Bottom(limit int64, field string, fieldsOrTags ...string) pipeline.MapReduceInfo { +func (influxqlMapReducers) Bottom(limit int64, field string, fieldsAndTags ...string) pipeline.MapReduceInfo { m := func(in *tsdb.MapInput) interface{} { - return tsdb.MapTopBottom(in, int(limit), fieldsOrTags, len(fieldsOrTags)+2, "bottom") + return tsdb.MapTopBottom(in, int(limit), fieldsAndTags, len(fieldsAndTags)+2, "bottom") } r := func(values []interface{}) interface{} { - return tsdb.ReduceTopBottom(values, int(limit), fieldsOrTags, "bottom") + return tsdb.ReduceTopBottom(values, int(limit), fieldsAndTags, "bottom") } return mr(field, "bottom", pipeline.BatchEdge, m, r) } diff --git a/influxql.gen.go b/influxql.gen.go new file mode 100644 index 000000000..d0a23b2be --- /dev/null +++ b/influxql.gen.go @@ -0,0 +1,499 @@ +// Generated by tmpl +// https://github.com/benbjohnson/tmpl +// +// DO NOT EDIT! +// Source: influxql.gen.go.tmpl + +package kapacitor + +import ( + "fmt" + "time" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/kapacitor/models" + "github.com/influxdata/kapacitor/pipeline" +) + +type floatPointAggregator struct { + field string + topBottomInfo *pipeline.TopBottomCallInfo + aggregator influxql.FloatPointAggregator +} + +func floatPopulateAuxFieldsAndTags(ap *influxql.FloatPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) { + ap.Aux = make([]interface{}, len(fieldsAndTags)) + for i, name := range fieldsAndTags { + if f, ok := fields[name]; ok { + ap.Aux[i] = f + } else { + ap.Aux[i] = tags[name] + } + } +} + +func (a *floatPointAggregator) AggregateBatch(b *models.Batch) { + for _, p := range b.Points { + ap := &influxql.FloatPoint{ + Name: b.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].(float64), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + floatPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + a.aggregator.AggregateFloat(ap) + } +} + +func (a *floatPointAggregator) AggregatePoint(p *models.Point) { + ap := &influxql.FloatPoint{ + Name: p.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].(float64), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + floatPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + a.aggregator.AggregateFloat(ap) +} + +type floatPointBulkAggregator struct { + field string + topBottomInfo *pipeline.TopBottomCallInfo + aggregator pipeline.FloatBulkPointAggregator +} + +func (a *floatPointBulkAggregator) AggregateBatch(b *models.Batch) { + slice := make([]influxql.FloatPoint, len(b.Points)) + for i, p := range b.Points { + slice[i] = influxql.FloatPoint{ + Name: b.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].(float64), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + floatPopulateAuxFieldsAndTags(&slice[i], a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + } + a.aggregator.AggregateFloatBulk(slice) +} + +func (a *floatPointBulkAggregator) AggregatePoint(p *models.Point) { + ap := &influxql.FloatPoint{ + Name: p.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].(float64), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + floatPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + a.aggregator.AggregateFloat(ap) +} + +type floatPointEmitter struct { + baseReduceContext + emitter influxql.FloatPointEmitter +} + +func (e *floatPointEmitter) EmitPoint() (models.Point, error) { + slice := e.emitter.Emit() + if len(slice) != 1 { + return models.Point{}, fmt.Errorf("unexpected result from InfluxQL function, got %d points expected 1", len(slice)) + } + ap := slice[0] + var t time.Time + if e.pointTimes { + if ap.Time == influxql.ZeroTime { + t = e.time + } else { + t = time.Unix(0, ap.Time).UTC() + } + } else { + t = e.time + } + return models.Point{ + Name: e.name, + Time: t, + Group: e.group, + Dimensions: e.dimensions, + Tags: e.tags, + Fields: map[string]interface{}{e.as: ap.Value}, + }, nil +} + +func (e *floatPointEmitter) EmitBatch() models.Batch { + slice := e.emitter.Emit() + b := models.Batch{ + Name: e.name, + TMax: e.time, + Group: e.group, + Tags: e.tags, + Points: make([]models.BatchPoint, len(slice)), + } + var t time.Time + for i, ap := range slice { + if e.pointTimes { + if ap.Time == influxql.ZeroTime { + t = e.time + } else { + t = time.Unix(0, ap.Time).UTC() + } + } else { + t = e.time + } + b.Points[i] = models.BatchPoint{ + Time: t, + Tags: ap.Tags.KeyValues(), + Fields: map[string]interface{}{e.as: ap.Value}, + } + } + return b +} + +type integerPointAggregator struct { + field string + topBottomInfo *pipeline.TopBottomCallInfo + aggregator influxql.IntegerPointAggregator +} + +func integerPopulateAuxFieldsAndTags(ap *influxql.IntegerPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) { + ap.Aux = make([]interface{}, len(fieldsAndTags)) + for i, name := range fieldsAndTags { + if f, ok := fields[name]; ok { + ap.Aux[i] = f + } else { + ap.Aux[i] = tags[name] + } + } +} + +func (a *integerPointAggregator) AggregateBatch(b *models.Batch) { + for _, p := range b.Points { + ap := &influxql.IntegerPoint{ + Name: b.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].(int64), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + integerPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + a.aggregator.AggregateInteger(ap) + } +} + +func (a *integerPointAggregator) AggregatePoint(p *models.Point) { + ap := &influxql.IntegerPoint{ + Name: p.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].(int64), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + integerPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + a.aggregator.AggregateInteger(ap) +} + +type integerPointBulkAggregator struct { + field string + topBottomInfo *pipeline.TopBottomCallInfo + aggregator pipeline.IntegerBulkPointAggregator +} + +func (a *integerPointBulkAggregator) AggregateBatch(b *models.Batch) { + slice := make([]influxql.IntegerPoint, len(b.Points)) + for i, p := range b.Points { + slice[i] = influxql.IntegerPoint{ + Name: b.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].(int64), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + integerPopulateAuxFieldsAndTags(&slice[i], a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + } + a.aggregator.AggregateIntegerBulk(slice) +} + +func (a *integerPointBulkAggregator) AggregatePoint(p *models.Point) { + ap := &influxql.IntegerPoint{ + Name: p.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].(int64), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + integerPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + a.aggregator.AggregateInteger(ap) +} + +type integerPointEmitter struct { + baseReduceContext + emitter influxql.IntegerPointEmitter +} + +func (e *integerPointEmitter) EmitPoint() (models.Point, error) { + slice := e.emitter.Emit() + if len(slice) != 1 { + return models.Point{}, fmt.Errorf("unexpected result from InfluxQL function, got %d points expected 1", len(slice)) + } + ap := slice[0] + var t time.Time + if e.pointTimes { + if ap.Time == influxql.ZeroTime { + t = e.time + } else { + t = time.Unix(0, ap.Time).UTC() + } + } else { + t = e.time + } + return models.Point{ + Name: e.name, + Time: t, + Group: e.group, + Dimensions: e.dimensions, + Tags: e.tags, + Fields: map[string]interface{}{e.as: ap.Value}, + }, nil +} + +func (e *integerPointEmitter) EmitBatch() models.Batch { + slice := e.emitter.Emit() + b := models.Batch{ + Name: e.name, + TMax: e.time, + Group: e.group, + Tags: e.tags, + Points: make([]models.BatchPoint, len(slice)), + } + var t time.Time + for i, ap := range slice { + if e.pointTimes { + if ap.Time == influxql.ZeroTime { + t = e.time + } else { + t = time.Unix(0, ap.Time).UTC() + } + } else { + t = e.time + } + b.Points[i] = models.BatchPoint{ + Time: t, + Tags: ap.Tags.KeyValues(), + Fields: map[string]interface{}{e.as: ap.Value}, + } + } + return b +} + +// floatReduceContext uses composition to implement the reduceContext interface +type floatReduceContext struct { + floatPointAggregator + floatPointEmitter +} + +// floatBulkReduceContext uses composition to implement the reduceContext interface +type floatBulkReduceContext struct { + floatPointBulkAggregator + floatPointEmitter +} + +// floatIntegerReduceContext uses composition to implement the reduceContext interface +type floatIntegerReduceContext struct { + floatPointAggregator + integerPointEmitter +} + +// floatBulkIntegerReduceContext uses composition to implement the reduceContext interface +type floatBulkIntegerReduceContext struct { + floatPointBulkAggregator + integerPointEmitter +} + +// integerFloatReduceContext uses composition to implement the reduceContext interface +type integerFloatReduceContext struct { + integerPointAggregator + floatPointEmitter +} + +// integerBulkFloatReduceContext uses composition to implement the reduceContext interface +type integerBulkFloatReduceContext struct { + integerPointBulkAggregator + floatPointEmitter +} + +// integerReduceContext uses composition to implement the reduceContext interface +type integerReduceContext struct { + integerPointAggregator + integerPointEmitter +} + +// integerBulkReduceContext uses composition to implement the reduceContext interface +type integerBulkReduceContext struct { + integerPointBulkAggregator + integerPointEmitter +} + +func determineReduceContextCreateFn(method string, value interface{}, rc pipeline.ReduceCreater) (fn createReduceContextFunc, err error) { + switch value.(type) { + + case float64: + switch { + + case rc.CreateFloatReducer != nil: + fn = func(c baseReduceContext) reduceContext { + a, e := rc.CreateFloatReducer() + return &floatReduceContext{ + floatPointAggregator: floatPointAggregator{ + field: c.field, + topBottomInfo: rc.TopBottomCallInfo, + aggregator: a, + }, + floatPointEmitter: floatPointEmitter{ + baseReduceContext: c, + emitter: e, + }, + } + } + case rc.CreateFloatBulkReducer != nil: + fn = func(c baseReduceContext) reduceContext { + a, e := rc.CreateFloatBulkReducer() + return &floatBulkReduceContext{ + floatPointBulkAggregator: floatPointBulkAggregator{ + field: c.field, + topBottomInfo: rc.TopBottomCallInfo, + aggregator: a, + }, + floatPointEmitter: floatPointEmitter{ + baseReduceContext: c, + emitter: e, + }, + } + } + + case rc.CreateFloatIntegerReducer != nil: + fn = func(c baseReduceContext) reduceContext { + a, e := rc.CreateFloatIntegerReducer() + return &floatIntegerReduceContext{ + floatPointAggregator: floatPointAggregator{ + field: c.field, + topBottomInfo: rc.TopBottomCallInfo, + aggregator: a, + }, + integerPointEmitter: integerPointEmitter{ + baseReduceContext: c, + emitter: e, + }, + } + } + case rc.CreateFloatBulkIntegerReducer != nil: + fn = func(c baseReduceContext) reduceContext { + a, e := rc.CreateFloatBulkIntegerReducer() + return &floatBulkIntegerReduceContext{ + floatPointBulkAggregator: floatPointBulkAggregator{ + field: c.field, + topBottomInfo: rc.TopBottomCallInfo, + aggregator: a, + }, + integerPointEmitter: integerPointEmitter{ + baseReduceContext: c, + emitter: e, + }, + } + } + + default: + err = fmt.Errorf("cannot apply %s to float64 field", method) + } + + case int64: + switch { + + case rc.CreateIntegerFloatReducer != nil: + fn = func(c baseReduceContext) reduceContext { + a, e := rc.CreateIntegerFloatReducer() + return &integerFloatReduceContext{ + integerPointAggregator: integerPointAggregator{ + field: c.field, + topBottomInfo: rc.TopBottomCallInfo, + aggregator: a, + }, + floatPointEmitter: floatPointEmitter{ + baseReduceContext: c, + emitter: e, + }, + } + } + case rc.CreateIntegerBulkFloatReducer != nil: + fn = func(c baseReduceContext) reduceContext { + a, e := rc.CreateIntegerBulkFloatReducer() + return &integerBulkFloatReduceContext{ + integerPointBulkAggregator: integerPointBulkAggregator{ + field: c.field, + topBottomInfo: rc.TopBottomCallInfo, + aggregator: a, + }, + floatPointEmitter: floatPointEmitter{ + baseReduceContext: c, + emitter: e, + }, + } + } + + case rc.CreateIntegerReducer != nil: + fn = func(c baseReduceContext) reduceContext { + a, e := rc.CreateIntegerReducer() + return &integerReduceContext{ + integerPointAggregator: integerPointAggregator{ + field: c.field, + topBottomInfo: rc.TopBottomCallInfo, + aggregator: a, + }, + integerPointEmitter: integerPointEmitter{ + baseReduceContext: c, + emitter: e, + }, + } + } + case rc.CreateIntegerBulkReducer != nil: + fn = func(c baseReduceContext) reduceContext { + a, e := rc.CreateIntegerBulkReducer() + return &integerBulkReduceContext{ + integerPointBulkAggregator: integerPointBulkAggregator{ + field: c.field, + topBottomInfo: rc.TopBottomCallInfo, + aggregator: a, + }, + integerPointEmitter: integerPointEmitter{ + baseReduceContext: c, + emitter: e, + }, + } + } + + default: + err = fmt.Errorf("cannot apply %s to int64 field", method) + } + + default: + err = fmt.Errorf("invalid field type: %T", value) + } + return +} diff --git a/influxql.gen.go.tmpl b/influxql.gen.go.tmpl new file mode 100644 index 000000000..8979aec40 --- /dev/null +++ b/influxql.gen.go.tmpl @@ -0,0 +1,231 @@ +package kapacitor + + +import ( + "fmt" + "time" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/kapacitor/models" + "github.com/influxdata/kapacitor/pipeline" +) + +{{/* Define typed Aggregate/Emit types */}} +{{range .}} + +type {{.name}}PointAggregator struct { + field string + topBottomInfo *pipeline.TopBottomCallInfo + aggregator influxql.{{.Name}}PointAggregator +} + +func {{.name}}PopulateAuxFieldsAndTags(ap *influxql.{{.Name}}Point, fieldsAndTags []string, fields models.Fields, tags models.Tags) { + ap.Aux = make([]interface{}, len(fieldsAndTags)) + for i, name := range fieldsAndTags { + if f, ok := fields[name]; ok { + ap.Aux[i] = f + } else { + ap.Aux[i] = tags[name] + } + } +} + +func (a *{{.name}}PointAggregator) AggregateBatch(b *models.Batch) { + for _, p := range b.Points { + ap := &influxql.{{.Name}}Point{ + Name: b.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].({{.Type}}), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + {{.name}}PopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + a.aggregator.Aggregate{{.Name}}(ap) + } +} + +func (a *{{.name}}PointAggregator) AggregatePoint(p *models.Point) { + ap := &influxql.{{.Name}}Point{ + Name: p.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].({{.Type}}), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + {{.name}}PopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + a.aggregator.Aggregate{{.Name}}(ap) +} + + + +type {{.name}}PointBulkAggregator struct { + field string + topBottomInfo *pipeline.TopBottomCallInfo + aggregator pipeline.{{.Name}}BulkPointAggregator +} + +func (a *{{.name}}PointBulkAggregator) AggregateBatch(b *models.Batch) { + slice := make([]influxql.{{.Name}}Point, len(b.Points)) + for i, p := range b.Points { + slice[i] = influxql.{{.Name}}Point{ + Name: b.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].({{.Type}}), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + {{.name}}PopulateAuxFieldsAndTags(&slice[i], a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + } + a.aggregator.Aggregate{{.Name}}Bulk(slice) +} + +func (a *{{.name}}PointBulkAggregator) AggregatePoint(p *models.Point) { + ap := &influxql.{{.Name}}Point{ + Name: p.Name, + Tags: influxql.NewTags(p.Tags), + Time: p.Time.UnixNano(), + Value: p.Fields[a.field].({{.Type}}), + } + if a.topBottomInfo != nil { + // We need to populate the Aux fields + {{.name}}PopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags) + } + a.aggregator.Aggregate{{.Name}}(ap) +} + +type {{.name}}PointEmitter struct { + baseReduceContext + emitter influxql.{{.Name}}PointEmitter +} + +func (e *{{.name}}PointEmitter) EmitPoint() (models.Point, error) { + slice := e.emitter.Emit() + if len(slice) != 1 { + return models.Point{}, fmt.Errorf("unexpected result from InfluxQL function, got %d points expected 1", len(slice)) + } + ap := slice[0] + var t time.Time + if e.pointTimes { + if ap.Time == influxql.ZeroTime { + t = e.time + } else { + t = time.Unix(0, ap.Time).UTC() + } + } else { + t = e.time + } + return models.Point{ + Name: e.name, + Time: t, + Group: e.group, + Dimensions: e.dimensions, + Tags: e.tags, + Fields: map[string]interface{}{e.as: ap.Value}, + }, nil +} + +func (e *{{.name}}PointEmitter) EmitBatch() models.Batch { + slice := e.emitter.Emit() + b := models.Batch{ + Name: e.name, + TMax: e.time, + Group: e.group, + Tags: e.tags, + Points: make([]models.BatchPoint, len(slice)), + } + var t time.Time + for i, ap := range slice { + if e.pointTimes { + if ap.Time == influxql.ZeroTime { + t = e.time + } else { + t = time.Unix(0, ap.Time).UTC() + } + } else { + t = e.time + } + b.Points[i] = models.BatchPoint{ + Time: t, + Tags: ap.Tags.KeyValues(), + Fields: map[string]interface{}{e.as: ap.Value}, + } + } + return b +} + +{{end}} + +{{/* Define composite types for reduceContext */}} +{{with $types := .}} +{{range $a := $types}} +{{range $e := $types}} + +// {{$a.name}}{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}ReduceContext uses composition to implement the reduceContext interface +type {{$a.name}}{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}ReduceContext struct { + {{$a.name}}PointAggregator + {{$e.name}}PointEmitter +} + +// {{$a.name}}Bulk{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}ReduceContext uses composition to implement the reduceContext interface +type {{$a.name}}Bulk{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}ReduceContext struct { + {{$a.name}}PointBulkAggregator + {{$e.name}}PointEmitter +} +{{end}}{{end}} + + +{{/* Define switch cases for reduceContext contruction */}} + +func determineReduceContextCreateFn(method string, value interface{}, rc pipeline.ReduceCreater) (fn createReduceContextFunc, err error) { + switch value.(type) { +{{range $a := $types}} + case {{.Type}}: + switch { +{{range $e := $types}} + case rc.Create{{$a.Name}}{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}Reducer != nil: + fn = func(c baseReduceContext) reduceContext { + a, e := rc.Create{{$a.Name}}{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}Reducer() + return &{{$a.name}}{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}ReduceContext{ + {{$a.name}}PointAggregator: {{$a.name}}PointAggregator{ + field: c.field, + topBottomInfo: rc.TopBottomCallInfo, + aggregator: a, + }, + {{$e.name}}PointEmitter: {{$e.name}}PointEmitter{ + baseReduceContext: c, + emitter: e, + }, + } + } + case rc.Create{{$a.Name}}Bulk{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}Reducer != nil: + fn = func(c baseReduceContext) reduceContext { + a, e := rc.Create{{$a.Name}}Bulk{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}Reducer() + return &{{$a.name}}Bulk{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}ReduceContext{ + {{$a.name}}PointBulkAggregator: {{$a.name}}PointBulkAggregator{ + field: c.field, + topBottomInfo: rc.TopBottomCallInfo, + aggregator: a, + }, + {{$e.name}}PointEmitter: {{$e.name}}PointEmitter{ + baseReduceContext: c, + emitter: e, + }, + } + } +{{end}} + default: + err = fmt.Errorf("cannot apply %s to {{$a.Type}} field", method) + } +{{end}} + default: + err = fmt.Errorf("invalid field type: %T", value) + } + return +} +{{end}} diff --git a/influxql.go b/influxql.go new file mode 100644 index 000000000..b1268f921 --- /dev/null +++ b/influxql.go @@ -0,0 +1,181 @@ +package kapacitor + +import ( + "fmt" + "log" + "time" + + "github.com/influxdata/kapacitor/models" + "github.com/influxdata/kapacitor/pipeline" +) + +// tmpl -- go get github.com/benbjohnson/tmpl +//go:generate tmpl -data=@tmpldata influxql.gen.go.tmpl + +type createReduceContextFunc func(c baseReduceContext) reduceContext + +type InfluxQLNode struct { + node + n *pipeline.InfluxQLNode + createFn createReduceContextFunc +} + +func newInfluxQLNode(et *ExecutingTask, n *pipeline.InfluxQLNode, l *log.Logger) (*InfluxQLNode, error) { + m := &InfluxQLNode{ + node: node{Node: n, et: et, logger: l}, + n: n, + } + m.node.runF = m.runInfluxQLs + return m, nil +} + +func (n *InfluxQLNode) runInfluxQLs([]byte) error { + switch n.n.Wants() { + case pipeline.StreamEdge: + return n.runStreamInfluxQL() + case pipeline.BatchEdge: + return n.runBatchInfluxQL() + default: + return fmt.Errorf("cannot map %v edge", n.n.Wants()) + } +} + +type reduceContext interface { + AggregatePoint(p *models.Point) + AggregateBatch(b *models.Batch) + EmitPoint() (models.Point, error) + EmitBatch() models.Batch + Time() time.Time +} + +type baseReduceContext struct { + as string + field string + name string + group models.GroupID + dimensions models.Dimensions + tags models.Tags + time time.Time + pointTimes bool + topBottomInfo *pipeline.TopBottomCallInfo +} + +func (c *baseReduceContext) Time() time.Time { + return c.time +} + +func (n *InfluxQLNode) runStreamInfluxQL() error { + contexts := make(map[models.GroupID]reduceContext) + for p, ok := n.ins[0].NextPoint(); ok; { + context := contexts[p.Group] + // Fisrt point in window + if context == nil { + // Create new context + c := baseReduceContext{ + as: n.n.As, + field: n.n.Field, + name: p.Name, + group: p.Group, + dimensions: p.Dimensions, + tags: p.PointTags(), + time: p.Time, + pointTimes: n.n.PointTimes, + } + + createFn, err := n.getCreateFn(p.Fields[c.field]) + if err != nil { + return err + } + + context = createFn(c) + contexts[p.Group] = context + context.AggregatePoint(&p) + + } else if p.Time.Equal(context.Time()) { + context.AggregatePoint(&p) + // advance to next point + p, ok = n.ins[0].NextPoint() + } else { + err := n.emit(context) + if err != nil { + return err + } + + // Nil out reduced point + contexts[p.Group] = nil + // do not advance, + // go through loop again to initialize new iterator. + } + } + return nil +} + +func (n *InfluxQLNode) runBatchInfluxQL() error { + for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() { + // Skip empty batches + if len(b.Points) == 0 { + continue + } + + // Create new base context + c := baseReduceContext{ + as: n.n.As, + field: n.n.Field, + name: b.Name, + group: b.Group, + dimensions: b.PointDimensions(), + tags: b.Tags, + time: b.TMax, + pointTimes: n.n.PointTimes, + } + createFn, err := n.getCreateFn(b.Points[0].Fields[c.field]) + if err != nil { + return err + } + + context := createFn(c) + context.AggregateBatch(&b) + err = n.emit(context) + if err != nil { + return err + } + } + return nil +} + +func (n *InfluxQLNode) getCreateFn(value interface{}) (createReduceContextFunc, error) { + if n.createFn != nil { + return n.createFn, nil + } + createFn, err := determineReduceContextCreateFn(n.n.Method, value, n.n.ReduceCreater) + if err != nil { + return nil, err + } + n.createFn = createFn + return n.createFn, nil +} + +func (n *InfluxQLNode) emit(context reduceContext) error { + switch n.Provides() { + case pipeline.StreamEdge: + p, err := context.EmitPoint() + if err != nil { + return err + } + for _, out := range n.outs { + err := out.CollectPoint(p) + if err != nil { + return err + } + } + case pipeline.BatchEdge: + b := context.EmitBatch() + for _, out := range n.outs { + err := out.CollectBatch(b) + if err != nil { + return err + } + } + } + return nil +} diff --git a/integrations/data/TestStream_Aggregations.srpl b/integrations/data/TestStream_InfluxQL.srpl similarity index 100% rename from integrations/data/TestStream_Aggregations.srpl rename to integrations/data/TestStream_InfluxQL.srpl diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index fd1812a80..8ef4ce18e 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -1333,7 +1333,7 @@ cpu.union(mem, disk) testStreamerWithOutput(t, "TestStream_Union", script, 15*time.Second, er, nil, false) } -func TestStream_Aggregations(t *testing.T) { +func TestStream_InfluxQL(t *testing.T) { type testCase struct { Method string @@ -1349,14 +1349,26 @@ stream .window() .period(10s) .every(10s) - .mapReduce({{ .Method }}({{ .Args }})) + .mapReduce(influxql.{{ .Method }}({{ .Args }})) {{ if .UsePointTimes }}.usePointTimes(){{ end }} - .httpOut('TestStream_Aggregations') + .httpOut('TestStream_InfluxQL') +` + + var newScriptTmpl = ` +stream + .from().measurement('cpu') + .where(lambda: "host" == 'serverA') + .window() + .period(10s) + .every(10s) + .{{ .Method }}({{ .Args }}) + {{ if .UsePointTimes }}.usePointTimes(){{ end }} + .httpOut('TestStream_InfluxQL') ` endTime := time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC) testCases := []testCase{ testCase{ - Method: "influxql.sum", + Method: "sum", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1372,7 +1384,7 @@ stream }, }, testCase{ - Method: "influxql.count", + Method: "count", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1388,7 +1400,7 @@ stream }, }, testCase{ - Method: "influxql.distinct", + Method: "distinct", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1398,27 +1410,27 @@ stream Values: [][]interface{}{ { endTime, - 91.0, + 98.0, }, { endTime, - 92.0, + 91.0, }, { endTime, - 93.0, + 95.0, }, { endTime, - 95.0, + 93.0, }, { endTime, - 96.0, + 92.0, }, { endTime, - 98.0, + 96.0, }, }, }, @@ -1426,7 +1438,7 @@ stream }, }, testCase{ - Method: "influxql.mean", + Method: "mean", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1442,7 +1454,7 @@ stream }, }, testCase{ - Method: "influxql.median", + Method: "median", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1458,7 +1470,7 @@ stream }, }, testCase{ - Method: "influxql.min", + Method: "min", UsePointTimes: true, ER: kapacitor.Result{ Series: imodels.Rows{ @@ -1475,7 +1487,7 @@ stream }, }, testCase{ - Method: "influxql.min", + Method: "min", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1491,7 +1503,7 @@ stream }, }, testCase{ - Method: "influxql.max", + Method: "max", UsePointTimes: true, ER: kapacitor.Result{ Series: imodels.Rows{ @@ -1508,7 +1520,7 @@ stream }, }, testCase{ - Method: "influxql.max", + Method: "max", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1524,7 +1536,7 @@ stream }, }, testCase{ - Method: "influxql.spread", + Method: "spread", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1540,7 +1552,7 @@ stream }, }, testCase{ - Method: "influxql.stddev", + Method: "stddev", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1556,7 +1568,7 @@ stream }, }, testCase{ - Method: "influxql.first", + Method: "first", UsePointTimes: true, ER: kapacitor.Result{ Series: imodels.Rows{ @@ -1573,7 +1585,7 @@ stream }, }, testCase{ - Method: "influxql.first", + Method: "first", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1589,7 +1601,7 @@ stream }, }, testCase{ - Method: "influxql.last", + Method: "last", UsePointTimes: true, ER: kapacitor.Result{ Series: imodels.Rows{ @@ -1606,7 +1618,7 @@ stream }, }, testCase{ - Method: "influxql.last", + Method: "last", ER: kapacitor.Result{ Series: imodels.Rows{ { @@ -1622,7 +1634,7 @@ stream }, }, testCase{ - Method: "influxql.percentile", + Method: "percentile", Args: "'value', 50.0", ER: kapacitor.Result{ Series: imodels.Rows{ @@ -1639,7 +1651,7 @@ stream }, }, testCase{ - Method: "influxql.top", + Method: "top", UsePointTimes: true, Args: "2, 'value'", ER: kapacitor.Result{ @@ -1667,7 +1679,7 @@ stream }, }, testCase{ - Method: "influxql.top", + Method: "top", Args: "2, 'value'", ER: kapacitor.Result{ Series: imodels.Rows{ @@ -1694,7 +1706,7 @@ stream }, }, testCase{ - Method: "influxql.bottom", + Method: "bottom", UsePointTimes: true, Args: "3, 'value'", ER: kapacitor.Result{ @@ -1728,7 +1740,7 @@ stream }, }, testCase{ - Method: "influxql.bottom", + Method: "bottom", Args: "3, 'value'", ER: kapacitor.Result{ Series: imodels.Rows{ @@ -1767,22 +1779,35 @@ stream t.Fatal(err) } + newTmpl, err := template.New("script").Parse(newScriptTmpl) + if err != nil { + t.Fatal(err) + } + + tmpls := []*template.Template{tmpl, newTmpl} + for _, tc := range testCases { - t.Log("Method:", tc.Method) - var script bytes.Buffer - if tc.Args == "" { - tc.Args = "'value'" - } - tmpl.Execute(&script, tc) - testStreamerWithOutput( - t, - "TestStream_Aggregations", - string(script.Bytes()), - 13*time.Second, - tc.ER, - nil, - false, - ) + for i, tmpl := range tmpls { + if tc.Method == "distinct" && i == 0 { + // Skip legacy test for new behavior + continue + } + t.Log("Method:", tc.Method, i) + var script bytes.Buffer + if tc.Args == "" { + tc.Args = "'value'" + } + tmpl.Execute(&script, tc) + testStreamerWithOutput( + t, + "TestStream_InfluxQL", + string(script.Bytes()), + 13*time.Second, + tc.ER, + nil, + false, + ) + } } } @@ -3019,7 +3044,7 @@ var topScores = stream .mapReduce(influxql.last('value')) // Calculate the top 5 scores per game .groupBy('game') - .mapReduce(influxql.top(5, 'last', 'player')) + .top(5, 'last', 'player') topScores .httpOut('top_scores') diff --git a/map_reduce.go b/map_reduce.go index 56610383b..c88ff3273 100644 --- a/map_reduce.go +++ b/map_reduce.go @@ -36,6 +36,7 @@ type MapNode struct { } func newMapNode(et *ExecutingTask, n *pipeline.MapNode, l *log.Logger) (*MapNode, error) { + l.Println("W! DEPRECATED use InfluxQLNode instead") f, ok := n.Map.(MapInfo) if !ok { return nil, fmt.Errorf("invalid map given to map node %T", n.Map) diff --git a/pipeline/influxql.gen.go b/pipeline/influxql.gen.go new file mode 100644 index 000000000..0e1e1f4a3 --- /dev/null +++ b/pipeline/influxql.gen.go @@ -0,0 +1,35 @@ +// Generated by tmpl +// https://github.com/benbjohnson/tmpl +// +// DO NOT EDIT! +// Source: influxql.gen.go.tmpl + +package pipeline + +import "github.com/influxdata/influxdb/influxql" + +type ReduceCreater struct { + CreateFloatReducer func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) + CreateFloatBulkReducer func() (FloatBulkPointAggregator, influxql.FloatPointEmitter) + + CreateFloatIntegerReducer func() (influxql.FloatPointAggregator, influxql.IntegerPointEmitter) + CreateFloatBulkIntegerReducer func() (FloatBulkPointAggregator, influxql.IntegerPointEmitter) + + CreateIntegerFloatReducer func() (influxql.IntegerPointAggregator, influxql.FloatPointEmitter) + CreateIntegerBulkFloatReducer func() (IntegerBulkPointAggregator, influxql.FloatPointEmitter) + + CreateIntegerReducer func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) + CreateIntegerBulkReducer func() (IntegerBulkPointAggregator, influxql.IntegerPointEmitter) + + TopBottomCallInfo *TopBottomCallInfo +} + +type FloatBulkPointAggregator interface { + influxql.FloatPointAggregator + influxql.FloatBulkPointAggregator +} + +type IntegerBulkPointAggregator interface { + influxql.IntegerPointAggregator + influxql.IntegerBulkPointAggregator +} diff --git a/pipeline/influxql.gen.go.tmpl b/pipeline/influxql.gen.go.tmpl new file mode 100644 index 000000000..6b0ee5b9d --- /dev/null +++ b/pipeline/influxql.gen.go.tmpl @@ -0,0 +1,21 @@ +package pipeline + +import "github.com/influxdata/influxdb/influxql" + +type ReduceCreater struct { +{{with $types := .}} +{{range $a := $types}} +{{range $e := $types}} + Create{{$a.Name}}{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}Reducer func() (influxql.{{$a.Name}}PointAggregator, influxql.{{$e.Name}}PointEmitter) + Create{{$a.Name}}Bulk{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}Reducer func() ({{$a.Name}}BulkPointAggregator, influxql.{{$e.Name}}PointEmitter) +{{end}}{{end}}{{end}} + + TopBottomCallInfo *TopBottomCallInfo +} + +{{range .}} +type {{.Name}}BulkPointAggregator interface { + influxql.{{.Name}}PointAggregator + influxql.{{.Name}}BulkPointAggregator +} +{{end}} diff --git a/pipeline/influxql.go b/pipeline/influxql.go new file mode 100644 index 000000000..f532b64e8 --- /dev/null +++ b/pipeline/influxql.go @@ -0,0 +1,334 @@ +package pipeline + +import "github.com/influxdata/influxdb/influxql" + +// tmpl -- go get github.com/benbjohnson/tmpl +//go:generate tmpl -data=@../tmpldata influxql.gen.go.tmpl + +// An InfluxQLNode performs the available function from the InfluxQL language. +// These function can be performed on a stream or batch edge. +// The resulting edge is dependent on the function. +// For a stream edge all points with the same time are accumulated into the function. +// For a batch edge all points in the batch are accumulated into the function. +// +// +// Example: +// stream +// .window() +// .period(10s) +// .every(10s) +// // Sum the values for each 10s window of data. +// .sum('value') +// +// +// Note: Derivative has its own implementation as a DerivativeNode instead of as part of the +// InfluxQL functions. +type InfluxQLNode struct { + chainnode + + // tick:ignore + Method string + // tick:ignore + Field string + + // The name of the field, defaults to the name of + // function used (i.e. .mean -> 'mean') + As string + + // tick:ignore + ReduceCreater ReduceCreater + + // tick:ignore + PointTimes bool +} + +func newInfluxQLNode(method, field string, wants, provides EdgeType, reducer ReduceCreater) *InfluxQLNode { + return &InfluxQLNode{ + chainnode: newBasicChainNode(method, wants, provides), + Method: method, + Field: field, + As: method, + ReduceCreater: reducer, + } +} + +// Use the time of the selected point instead of the time of the batch. +// +// Only applies to selector MR functions like first, last, top, bottom, etc. +// Aggregation functions always use the batch time. +// tick:property +func (n *InfluxQLNode) UsePointTimes() *InfluxQLNode { + n.PointTimes = true + return n +} + +//------------------------------------ +// Aggregation Functions +// + +// Count the number of points. +func (n *chainnode) Count(field string) *InfluxQLNode { + i := newInfluxQLNode("count", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatIntegerReducer: func() (influxql.FloatPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewFloatFuncIntegerReducer(influxql.FloatCountReduce) + return fn, fn + }, + CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerFuncReducer(influxql.IntegerCountReduce) + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +// Produce batch of only the distinct points. +func (n *chainnode) Distinct(field string) *InfluxQLNode { + i := newInfluxQLNode("distinct", field, n.Provides(), BatchEdge, ReduceCreater{ + CreateFloatBulkReducer: func() (FloatBulkPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatSliceFuncReducer(influxql.FloatDistinctReduceSlice) + return fn, fn + }, + CreateIntegerBulkReducer: func() (IntegerBulkPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerSliceFuncReducer(influxql.IntegerDistinctReduceSlice) + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +// Compute the mean of the data. +func (n *chainnode) Mean(field string) *InfluxQLNode { + i := newInfluxQLNode("mean", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatMeanReducer() + return fn, fn + }, + CreateIntegerFloatReducer: func() (influxql.IntegerPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewIntegerMeanReducer() + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +// Compute the median of the data. Note, this method is not a selector, +// if you want the median point use .percentile(field, 50.0). +func (n *chainnode) Median(field string) *InfluxQLNode { + i := newInfluxQLNode("median", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatBulkReducer: func() (FloatBulkPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatSliceFuncReducer(influxql.FloatMedianReduceSlice) + return fn, fn + }, + CreateIntegerBulkFloatReducer: func() (IntegerBulkPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewIntegerSliceFuncFloatReducer(influxql.IntegerMedianReduceSlice) + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +// Compute the difference between min and max points. +func (n *chainnode) Spread(field string) *InfluxQLNode { + i := newInfluxQLNode("spread", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatBulkReducer: func() (FloatBulkPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatSliceFuncReducer(influxql.FloatSpreadReduceSlice) + return fn, fn + }, + CreateIntegerBulkReducer: func() (IntegerBulkPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerSliceFuncReducer(influxql.IntegerSpreadReduceSlice) + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +// Compute the sum of all values. +func (n *chainnode) Sum(field string) *InfluxQLNode { + i := newInfluxQLNode("sum", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatFuncReducer(influxql.FloatSumReduce) + return fn, fn + }, + CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerFuncReducer(influxql.IntegerSumReduce) + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +//------------------------------------ +// Selection Functions +// + +// Select the first point. +func (n *chainnode) First(field string) *InfluxQLNode { + i := newInfluxQLNode("first", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatFuncReducer(influxql.FloatFirstReduce) + return fn, fn + }, + CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerFuncReducer(influxql.IntegerFirstReduce) + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +// Select the last point. +func (n *chainnode) Last(field string) *InfluxQLNode { + i := newInfluxQLNode("last", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatFuncReducer(influxql.FloatLastReduce) + return fn, fn + }, + CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerFuncReducer(influxql.IntegerLastReduce) + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +// Select the minimum point. +func (n *chainnode) Min(field string) *InfluxQLNode { + i := newInfluxQLNode("min", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatFuncReducer(influxql.FloatMinReduce) + return fn, fn + }, + CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerFuncReducer(influxql.IntegerMinReduce) + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +// Select the maximum point. +func (n *chainnode) Max(field string) *InfluxQLNode { + i := newInfluxQLNode("max", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatFuncReducer(influxql.FloatMaxReduce) + return fn, fn + }, + CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerFuncReducer(influxql.IntegerMaxReduce) + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +// Select a point at the given percentile. This is a selector function, no interpolation between points is performed. +func (n *chainnode) Percentile(field string, percentile float64) *InfluxQLNode { + i := newInfluxQLNode("percentile", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatBulkReducer: func() (FloatBulkPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatSliceFuncReducer(influxql.NewFloatPercentileReduceSliceFunc(percentile)) + return fn, fn + }, + CreateIntegerBulkReducer: func() (IntegerBulkPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerSliceFuncReducer(influxql.NewIntegerPercentileReduceSliceFunc(percentile)) + return fn, fn + }, + }) + n.linkChild(i) + return i +} + +type TopBottomCallInfo struct { + FieldsAndTags []string +} + +// Select the top `num` points for `field` and sort by any extra tags or fields. +func (n *chainnode) Top(num int64, field string, fieldsAndTags ...string) *InfluxQLNode { + tags := make([]int, len(fieldsAndTags)) + for i := range fieldsAndTags { + tags[i] = i + } + i := newInfluxQLNode("top", field, n.Provides(), BatchEdge, ReduceCreater{ + CreateFloatBulkReducer: func() (FloatBulkPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatSliceFuncReducer(influxql.NewFloatTopReduceSliceFunc( + int(num), + tags, + influxql.Interval{}, + )) + return fn, fn + }, + CreateIntegerBulkReducer: func() (IntegerBulkPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerSliceFuncReducer(influxql.NewIntegerTopReduceSliceFunc( + int(num), + tags, + influxql.Interval{}, + )) + return fn, fn + }, + TopBottomCallInfo: &TopBottomCallInfo{ + FieldsAndTags: fieldsAndTags, + }, + }) + n.linkChild(i) + return i +} + +// Select the bottom `num` points for `field` and sort by any extra tags or fields. +func (n *chainnode) Bottom(num int64, field string, fieldsAndTags ...string) *InfluxQLNode { + tags := make([]int, len(fieldsAndTags)) + for i := range fieldsAndTags { + tags[i] = i + } + i := newInfluxQLNode("bottom", field, n.Provides(), BatchEdge, ReduceCreater{ + CreateFloatBulkReducer: func() (FloatBulkPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatSliceFuncReducer(influxql.NewFloatBottomReduceSliceFunc( + int(num), + tags, + influxql.Interval{}, + )) + return fn, fn + }, + CreateIntegerBulkReducer: func() (IntegerBulkPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerSliceFuncReducer(influxql.NewIntegerBottomReduceSliceFunc( + int(num), + tags, + influxql.Interval{}, + )) + return fn, fn + }, + TopBottomCallInfo: &TopBottomCallInfo{ + FieldsAndTags: fieldsAndTags, + }, + }) + n.linkChild(i) + return i +} + +//------------------------------------ +// Transformation Functions +// + +// Compute the standard deviation. +func (n *chainnode) Stddev(field string) *InfluxQLNode { + i := newInfluxQLNode("stddev", field, n.Provides(), StreamEdge, ReduceCreater{ + CreateFloatBulkReducer: func() (FloatBulkPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatSliceFuncReducer(influxql.FloatStddevReduceSlice) + return fn, fn + }, + CreateIntegerBulkFloatReducer: func() (IntegerBulkPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewIntegerSliceFuncFloatReducer(influxql.IntegerStddevReduceSlice) + return fn, fn + }, + }) + n.linkChild(i) + return i +} diff --git a/pipeline/map_reduce.go b/pipeline/map_reduce.go index 6112477df..31f7a50fb 100644 --- a/pipeline/map_reduce.go +++ b/pipeline/map_reduce.go @@ -7,6 +7,9 @@ type MapReduceInfo struct { Edge EdgeType } +// DEPRECATION WARNING: As of v0.11 you can use the new InfluxQLNode to perform map reduce functions. +// This way of performing influxql functions will be removed in the 0.12 release. +// // Performs a map operation on the data stream. // In the map-reduce framework it is assumed that // several different partitions of the data can be @@ -35,6 +38,9 @@ func newMapNode(wants EdgeType, i interface{}) *MapNode { } } +// DEPRECATION WARNING: As of v0.11 you can use the new InfluxQLNode to perform map reduce functions. +// This way of performing influxql functions will be removed in the 0.12 release. +// // Performs a reduce operation on the data stream. // In the map-reduce framework it is assumed that // several different partitions of the data can be diff --git a/pipeline/node.go b/pipeline/node.go index 5390b28f8..24d3c64e1 100644 --- a/pipeline/node.go +++ b/pipeline/node.go @@ -359,36 +359,9 @@ func (n *chainnode) GroupBy(tag ...interface{}) *GroupByNode { return g } -// Curently you must use MapReduce -//// Perform just the map step of a map-reduce operation. -//// A map step must always be followed by a reduce step. -//// See Apply for performing simple transformations. -//// See MapReduce for performing map-reduce in one command. -//// -//// NOTE: Map can only be applied to batch edges. -//func (n *chainnode) Map(f interface{}) (c *MapNode) { -// if n.Provides() != BatchEdge { -// panic("cannot MapReduce non batch edge, did you forget to window the data?") -// } -// c = newMapNode(f) -// n.linkChild(c) -// return c -//} +// DEPRECATION WARNING: As of v0.11 you can use the new InfluxQLNode to perform map reduce functions. +// This way of performing influxql functions will be removed in the 0.12 release. // -//// Perform just the reduce step of a map-reduce operation. -//// -//// NOTE: Reduce can only be applied to map edges. -//func (n *chainnode) Reduce(f interface{}) (c *ReduceNode) { -// switch n.Provides() { -// case ReduceEdge: -// c = newReduceNode(f) -// default: -// panic("cannot Reduce non reduce edge, did you forget to map the data?") -// } -// n.linkChild(c) -// return c -//} - // Perform a map-reduce operation on the data. // The built-in functions under `influxql` provide the // selection,aggregation, and transformation functions diff --git a/task.go b/task.go index 2dae9ab56..5caed3e75 100644 --- a/task.go +++ b/task.go @@ -404,6 +404,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, l *log.Logger) (Node, error return newShiftNode(et, t, l) case *pipeline.NoOpNode: return newNoOpNode(et, t, l) + case *pipeline.InfluxQLNode: + return newInfluxQLNode(et, t, l) default: return nil, fmt.Errorf("unknown pipeline node type %T", p) } diff --git a/tick/eval.go b/tick/eval.go index ce49f6419..b51d0875e 100644 --- a/tick/eval.go +++ b/tick/eval.go @@ -303,7 +303,7 @@ func NewReflectionDescriber(obj interface{}) *ReflectionDescriber { } func (r *ReflectionDescriber) Desc() string { - return reflect.TypeOf(r.obj).Name() + return fmt.Sprintf("%T", r.obj) } // Using reflection check if the object has the method or field. diff --git a/tick/node.go b/tick/node.go index 706b863c9..3d1e3384c 100644 --- a/tick/node.go +++ b/tick/node.go @@ -7,7 +7,7 @@ import ( "strconv" "time" - "github.com/influxdb/influxdb/influxql" + "github.com/influxdata/influxdb/influxql" ) type unboundFunc func(obj interface{}) (interface{}, error) diff --git a/tmpldata b/tmpldata new file mode 100644 index 000000000..971e96428 --- /dev/null +++ b/tmpldata @@ -0,0 +1,17 @@ +[ + { + "Name":"Float", + "name":"float", + "Type":"float64", + "Nil":"0", + "Zero":"float64(0)" + }, + { + "Name":"Integer", + "name":"integer", + "Type":"int64", + "Nil":"0", + "Zero":"int64(0)" + } +] +