Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate new query engine from InfluxDB [WIP] #280

Merged
merged 2 commits into from
Mar 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,31 @@

### Release Notes

Kapacitor is now using the functions from the new query engine in InfluxDB core.
Along with this change is a change in the TICKscript API so that using the InfluxQL functions is easier.
Simply call the desired method directly no need to call `.mapReduce` explicitly.
This change now hides the mapReduce aspect and handles it internally.
Using `.mapReduce` is officially deprecated in this release and will be remove in the next major release.
We feel that this change improves the readability of TICKscripts and exposes less implementation details
to the end user.
Updating your exising TICKscripts is simple.
If previously you had code like this:

```javascript
stream.from()...
.window()...
.mapReduce(influxql.count('value'))
```
then update it to look like this:

```javascript
stream.from()...
.window()...
.count('value')
```

a simple regex could fix all your existing scripts.

Kapacitor now exposes more internal metrics for determining the performance of a given task.
The internal statistics includes a new measurement named `node` that contains any stats a node provides, tagged by the task, node, task type and kind of node (i.e. window vs union).
All nodes provide an averaged execution time for the node.
Expand All @@ -24,6 +49,7 @@ With #144 you can now join streams with differing group by dimensions.
- [#145](https://github.com/influxdata/kapacitor/issues/145): The InfluxDB Out Node now writes data to InfluxDB in buffers.
- [#215](https://github.com/influxdata/kapacitor/issues/215): Add performance metrics to nodes for average execution times and node throughput values.
- [#144](https://github.com/influxdata/kapacitor/issues/144): Can now join streams with differing dimensions using the join.On property.
- [#249](https://github.com/influxdata/kapacitor/issues/249): Can now use InfluxQL functions directly instead of via the MapReduce method. Example `stream.from().count()`.


### Bugfixes
Expand Down
1 change: 1 addition & 0 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def package_scripts(build_root):
def run_generate():
print "Running generate..."
run("go get github.com/gogo/protobuf/protoc-gen-gogo")
run("go get github.com/benbjohnson/tmpl")
run("go generate ./...")
print "Generate succeeded."
return True
Expand Down
12 changes: 6 additions & 6 deletions functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,22 @@ func (influxqlMapReducers) Percentile(field string, p float64) pipeline.MapReduc
return mr(field, "percentile", pipeline.StreamEdge, tsdb.MapEcho, r)
}

func (influxqlMapReducers) Top(limit int64, field string, fieldsOrTags ...string) pipeline.MapReduceInfo {
func (influxqlMapReducers) Top(limit int64, field string, fieldsAndTags ...string) pipeline.MapReduceInfo {
m := func(in *tsdb.MapInput) interface{} {
return tsdb.MapTopBottom(in, int(limit), fieldsOrTags, len(fieldsOrTags)+2, "top")
return tsdb.MapTopBottom(in, int(limit), fieldsAndTags, len(fieldsAndTags)+2, "top")
}
r := func(values []interface{}) interface{} {
return tsdb.ReduceTopBottom(values, int(limit), fieldsOrTags, "top")
return tsdb.ReduceTopBottom(values, int(limit), fieldsAndTags, "top")
}
return mr(field, "top", pipeline.BatchEdge, m, r)
}

func (influxqlMapReducers) Bottom(limit int64, field string, fieldsOrTags ...string) pipeline.MapReduceInfo {
func (influxqlMapReducers) Bottom(limit int64, field string, fieldsAndTags ...string) pipeline.MapReduceInfo {
m := func(in *tsdb.MapInput) interface{} {
return tsdb.MapTopBottom(in, int(limit), fieldsOrTags, len(fieldsOrTags)+2, "bottom")
return tsdb.MapTopBottom(in, int(limit), fieldsAndTags, len(fieldsAndTags)+2, "bottom")
}
r := func(values []interface{}) interface{} {
return tsdb.ReduceTopBottom(values, int(limit), fieldsOrTags, "bottom")
return tsdb.ReduceTopBottom(values, int(limit), fieldsAndTags, "bottom")
}
return mr(field, "bottom", pipeline.BatchEdge, m, r)
}
Expand Down
Loading