Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

httpout node chains #369

Merged
merged 1 commit into from
Mar 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ For example:
- [#239](https://github.com/influxdata/kapacitor/issues/239): Support more detailed TLS config when connecting to an InfluxDB host.
- [#323](https://github.com/influxdata/kapacitor/pull/323): Stats for task execution are provided via JSON HTTP request instead of just DOT string. thanks @yosiat
- [#358](https://github.com/influxdata/kapacitor/issues/358): Improved logging. Adds LogNode so any data in a pipeline can be logged.
- [#366](https://github.com/influxdata/kapacitor/issues/366): HttpOutNode now allows chaining methods.


### Bugfixes
Expand Down
12 changes: 12 additions & 0 deletions http_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,25 @@ func (h *HTTPOutNode) runOut([]byte) error {
row := models.PointToRow(p)
h.updateResultWithRow(p.Group, row)
h.timer.Stop()
for _, child := range h.outs {
err := child.CollectPoint(p)
if err != nil {
return err
}
}
}
case pipeline.BatchEdge:
for b, ok := h.ins[0].NextBatch(); ok; b, ok = h.ins[0].NextBatch() {
h.timer.Start()
row := models.BatchToRow(b)
h.updateResultWithRow(b.Group, row)
h.timer.Stop()
for _, child := range h.outs {
err := child.CollectBatch(b)
if err != nil {
return err
}
}
}
}
return nil
Expand Down
30 changes: 30 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,36 @@ stream
testStreamerWithOutput(t, "TestStream_SimpleMR", script, 15*time.Second, er, nil, false)
}

func TestStream_HttpOutPassThrough(t *testing.T) {

var script = `
stream
.from().measurement('cpu')
.where(lambda: "host" == 'serverA')
.window()
.period(10s)
.every(10s)
.mapReduce(influxql.count('value'))
.httpOut('unused')
.httpOut('TestStream_SimpleMR')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
10.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_SimpleMR", script, 15*time.Second, er, nil, false)
}

func TestStream_BatchGroupBy(t *testing.T) {

var script = `
Expand Down
10 changes: 3 additions & 7 deletions pipeline/http_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package pipeline
// .httpOut('top10')
//
type HTTPOutNode struct {
node
chainnode

// The relative path where the cached data is exposed
// tick:ignore
Expand All @@ -26,11 +26,7 @@ type HTTPOutNode struct {

func newHTTPOutNode(wants EdgeType, endpoint string) *HTTPOutNode {
return &HTTPOutNode{
node: node{
desc: "http_out",
wants: wants,
provides: NoEdge,
},
Endpoint: endpoint,
chainnode: newBasicChainNode("http_out", wants, wants),
Endpoint: endpoint,
}
}
3 changes: 2 additions & 1 deletion services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ func (h *Handler) AddRoute(r Route) error {
handler = cors(handler)
handler = requestID(handler)

if h.loggingEnabled {
// Logs are INFO level only enable if we are logging INFOs
if h.loggingEnabled && wlog.LogLevel() <= wlog.INFO {
handler = logging(handler, r.Name, h.Logger)
}
handler = recovery(handler, r.Name, h.Logger) // make sure recovery is always last
Expand Down