Skip to content

Commit

Permalink
Merge pull request #280 from influxdata/nc-issue#249
Browse files Browse the repository at this point in the history
Integrate new query engine from InfluxDB
  • Loading branch information
Nathaniel Cook committed Mar 8, 2016
2 parents a5cf89b + b28ab64 commit f6c56b7
Show file tree
Hide file tree
Showing 18 changed files with 1,434 additions and 82 deletions.
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,31 @@

### Release Notes

Kapacitor is now using the functions from the new query engine in InfluxDB core.
Along with this change is a change in the TICKscript API so that using the InfluxQL functions is easier.
Simply call the desired method directly no need to call `.mapReduce` explicitly.
This change now hides the mapReduce aspect and handles it internally.
Using `.mapReduce` is officially deprecated in this release and will be remove in the next major release.
We feel that this change improves the readability of TICKscripts and exposes less implementation details
to the end user.
Updating your exising TICKscripts is simple.
If previously you had code like this:

```javascript
stream.from()...
.window()...
.mapReduce(influxql.count('value'))
```
then update it to look like this:

```javascript
stream.from()...
.window()...
.count('value')
```

a simple regex could fix all your existing scripts.

Kapacitor now exposes more internal metrics for determining the performance of a given task.
The internal statistics includes a new measurement named `node` that contains any stats a node provides, tagged by the task, node, task type and kind of node (i.e. window vs union).
All nodes provide an averaged execution time for the node.
Expand All @@ -24,6 +49,7 @@ With #144 you can now join streams with differing group by dimensions.
- [#145](https://github.com/influxdata/kapacitor/issues/145): The InfluxDB Out Node now writes data to InfluxDB in buffers.
- [#215](https://github.com/influxdata/kapacitor/issues/215): Add performance metrics to nodes for average execution times and node throughput values.
- [#144](https://github.com/influxdata/kapacitor/issues/144): Can now join streams with differing dimensions using the join.On property.
- [#249](https://github.com/influxdata/kapacitor/issues/249): Can now use InfluxQL functions directly instead of via the MapReduce method. Example `stream.from().count()`.


### Bugfixes
Expand Down
1 change: 1 addition & 0 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def package_scripts(build_root):
def run_generate():
print "Running generate..."
run("go get github.com/gogo/protobuf/protoc-gen-gogo")
run("go get github.com/benbjohnson/tmpl")
run("go generate ./...")
print "Generate succeeded."
return True
Expand Down
12 changes: 6 additions & 6 deletions functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,22 @@ func (influxqlMapReducers) Percentile(field string, p float64) pipeline.MapReduc
return mr(field, "percentile", pipeline.StreamEdge, tsdb.MapEcho, r)
}

func (influxqlMapReducers) Top(limit int64, field string, fieldsOrTags ...string) pipeline.MapReduceInfo {
func (influxqlMapReducers) Top(limit int64, field string, fieldsAndTags ...string) pipeline.MapReduceInfo {
m := func(in *tsdb.MapInput) interface{} {
return tsdb.MapTopBottom(in, int(limit), fieldsOrTags, len(fieldsOrTags)+2, "top")
return tsdb.MapTopBottom(in, int(limit), fieldsAndTags, len(fieldsAndTags)+2, "top")
}
r := func(values []interface{}) interface{} {
return tsdb.ReduceTopBottom(values, int(limit), fieldsOrTags, "top")
return tsdb.ReduceTopBottom(values, int(limit), fieldsAndTags, "top")
}
return mr(field, "top", pipeline.BatchEdge, m, r)
}

func (influxqlMapReducers) Bottom(limit int64, field string, fieldsOrTags ...string) pipeline.MapReduceInfo {
func (influxqlMapReducers) Bottom(limit int64, field string, fieldsAndTags ...string) pipeline.MapReduceInfo {
m := func(in *tsdb.MapInput) interface{} {
return tsdb.MapTopBottom(in, int(limit), fieldsOrTags, len(fieldsOrTags)+2, "bottom")
return tsdb.MapTopBottom(in, int(limit), fieldsAndTags, len(fieldsAndTags)+2, "bottom")
}
r := func(values []interface{}) interface{} {
return tsdb.ReduceTopBottom(values, int(limit), fieldsOrTags, "bottom")
return tsdb.ReduceTopBottom(values, int(limit), fieldsAndTags, "bottom")
}
return mr(field, "bottom", pipeline.BatchEdge, m, r)
}
Expand Down
Loading

0 comments on commit f6c56b7

Please sign in to comment.