diff --git a/CHANGELOG.md b/CHANGELOG.md index 24c5d1d66..47bd0495b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ With #144 you can now join streams with differing group by dimensions. - [#304](https://github.com/influxdata/kapacitor/issues/304): Fix panic if recording query but do not have an InfluxDB instance configured - [#289](https://github.com/influxdata/kapacitor/issues/289): Add better error handling to batch node. - [#142](https://github.com/influxdata/kapacitor/issues/142): Fixes bug when defining multiple influxdb hosts. +- [#333](https://github.com/influxdata/kapacitor/issues/333): Fixes hang when replaying with .stats node. Fixes issues with batch and stats. ## v0.10.1 [2016-02-08] diff --git a/batch.go b/batch.go index 514890fa4..73a788cb0 100644 --- a/batch.go +++ b/batch.go @@ -52,7 +52,7 @@ func (s *SourceBatchNode) addParentEdge(in *Edge) { func (s *SourceBatchNode) start([]byte) { } -func (s *SourceBatchNode) Err() error { +func (s *SourceBatchNode) Wait() error { return nil } diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 8af6a2b1e..ac239fd07 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -3226,8 +3226,9 @@ func fastForwardTask( return err } tm.Drain() + et.StopStats() // Wait till the task is finished - if err := et.Err(); err != nil { + if err := et.Wait(); err != nil { return err } return nil diff --git a/node.go b/node.go index e321b5e22..ee4cab897 100644 --- a/node.go +++ b/node.go @@ -35,7 +35,7 @@ type Node interface { restore(snapshot []byte) error // wait for the node to finish processing and return any errors - Err() error + Wait() error // link specified child linkChild(c Node) error @@ -133,7 +133,7 @@ func (n *node) snapshot() (b []byte, err error) { return } // no-op restore func (n *node) restore([]byte) error { return nil } -func (n *node) Err() error { +func (n *node) Wait() error { n.finishedMu.Lock() defer n.finishedMu.Unlock() if !n.finished { @@ -254,10 +254,11 @@ type nodeStats struct { } // Return a copy of the current node statistics. +// If if no groups have been seen yet a NilGroup will be created with zero stats. func (n *node) nodeStatsByGroup() (stats map[models.GroupID]nodeStats) { // Get the counts for just one output. + stats = make(map[models.GroupID]nodeStats) if len(n.outs) > 0 { - stats = make(map[models.GroupID]nodeStats) n.outs[0].readGroupStats(func(group models.GroupID, c, e int64, tags models.Tags, dims []string) { stats[group] = nodeStats{ Fields: models.Fields{ @@ -269,6 +270,14 @@ func (n *node) nodeStatsByGroup() (stats map[models.GroupID]nodeStats) { } }) } + if len(stats) == 0 { + // If we have no groups/stats add nil group with emitted = 0 + stats[models.NilGroup] = nodeStats{ + Fields: models.Fields{ + "emitted": int64(0), + }, + } + } return } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 0de3a0cba..63ffedf66 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -50,11 +50,17 @@ func CreatePipeline(script string, sourceEdge EdgeType, scope *tick.Scope, deadm if err != nil { return nil, err } - if sourceEdge == StreamEdge && deadman.Global() { - src.(*SourceStreamNode).Deadman(deadman.Threshold(), deadman.Interval()) + if deadman.Global() { + switch s := src.(type) { + case *SourceStreamNode: + s.Deadman(deadman.Threshold(), deadman.Interval()) + case *SourceBatchNode: + s.Deadman(deadman.Threshold(), deadman.Interval()) + default: + return nil, fmt.Errorf("source edge type must be either Stream or Batch not %s", sourceEdge) + } } return p, nil - } func (p *Pipeline) addSource(src Node) { diff --git a/services/replay/service.go b/services/replay/service.go index 219921740..ec31c07d5 100644 --- a/services/replay/service.go +++ b/services/replay/service.go @@ -219,8 +219,11 @@ func (r *Service) handleReplay(w http.ResponseWriter, req *http.Request) { // Drain tm so the task can finish tm.Drain() + // Stop stats nodes + et.StopStats() + // Check for error on task - err = et.Err() + err = et.Wait() if err != nil { httpd.HttpError(w, "task run: "+err.Error(), true, http.StatusInternalServerError) return diff --git a/services/task_store/service.go b/services/task_store/service.go index 67bebeed8..86bc426a7 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -629,7 +629,7 @@ func (ts *Service) StartTask(t *kapacitor.Task) error { go func() { // Wait for task to finish - err := et.Err() + err := et.Wait() // Stop task ts.TaskMaster.StopTask(et.Task.Name) diff --git a/stats.go b/stats.go index 71b5a642e..a8cf354e1 100644 --- a/stats.go +++ b/stats.go @@ -3,6 +3,7 @@ package kapacitor import ( "fmt" "log" + "sync" "time" "github.com/influxdata/kapacitor/models" @@ -14,6 +15,8 @@ type StatsNode struct { s *pipeline.StatsNode en Node closing chan struct{} + closed bool + mu sync.Mutex } // Create a new StreamNode which filters data from a source. @@ -69,5 +72,10 @@ func (s *StatsNode) runStats([]byte) error { } func (s *StatsNode) stopStats() { - close(s.closing) + s.mu.Lock() + defer s.mu.Unlock() + if !s.closed { + s.closed = true + close(s.closing) + } } diff --git a/task.go b/task.go index 6adbfabb3..12ac95913 100644 --- a/task.go +++ b/task.go @@ -200,7 +200,7 @@ func (et *ExecutingTask) stop() (err error) { close(et.stopping) et.walk(func(n Node) error { n.stop() - e := n.Err() + e := n.Wait() if e != nil { err = e } @@ -270,10 +270,20 @@ func (et *ExecutingTask) checkDBRPs(batcher *SourceBatchNode) error { return nil } +// Stop all stats nodes +func (et *ExecutingTask) StopStats() { + et.walk(func(n Node) error { + if s, ok := n.(*StatsNode); ok { + s.stopStats() + } + return nil + }) +} + // Wait till the task finishes and return any error -func (et *ExecutingTask) Err() error { +func (et *ExecutingTask) Wait() error { return et.rwalk(func(n Node) error { - return n.Err() + return n.Wait() }) }