Skip to content

Commit

Permalink
deadman fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Mar 15, 2016
1 parent 0b43cc3 commit bf76728
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ With #144 you can now join streams with differing group by dimensions.
- [#304](https://github.com/influxdata/kapacitor/issues/304): Fix panic if recording query but do not have an InfluxDB instance configured
- [#289](https://github.com/influxdata/kapacitor/issues/289): Add better error handling to batch node.
- [#142](https://github.com/influxdata/kapacitor/issues/142): Fixes bug when defining multiple influxdb hosts.
- [#333](https://github.com/influxdata/kapacitor/issues/333): Fixes hang when replaying with .stats node. Fixes issues with batch and stats.

## v0.10.1 [2016-02-08]

Expand Down
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *SourceBatchNode) addParentEdge(in *Edge) {
func (s *SourceBatchNode) start([]byte) {
}

func (s *SourceBatchNode) Err() error {
func (s *SourceBatchNode) Wait() error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3226,8 +3226,9 @@ func fastForwardTask(
return err
}
tm.Drain()
et.StopStats()
// Wait till the task is finished
if err := et.Err(); err != nil {
if err := et.Wait(); err != nil {
return err
}
return nil
Expand Down
15 changes: 12 additions & 3 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Node interface {
restore(snapshot []byte) error

// wait for the node to finish processing and return any errors
Err() error
Wait() error

// link specified child
linkChild(c Node) error
Expand Down Expand Up @@ -133,7 +133,7 @@ func (n *node) snapshot() (b []byte, err error) { return }
// no-op restore
func (n *node) restore([]byte) error { return nil }

func (n *node) Err() error {
func (n *node) Wait() error {
n.finishedMu.Lock()
defer n.finishedMu.Unlock()
if !n.finished {
Expand Down Expand Up @@ -254,10 +254,11 @@ type nodeStats struct {
}

// Return a copy of the current node statistics.
// If if no groups have been seen yet a NilGroup will be created with zero stats.
func (n *node) nodeStatsByGroup() (stats map[models.GroupID]nodeStats) {
// Get the counts for just one output.
stats = make(map[models.GroupID]nodeStats)
if len(n.outs) > 0 {
stats = make(map[models.GroupID]nodeStats)
n.outs[0].readGroupStats(func(group models.GroupID, c, e int64, tags models.Tags, dims []string) {
stats[group] = nodeStats{
Fields: models.Fields{
Expand All @@ -269,6 +270,14 @@ func (n *node) nodeStatsByGroup() (stats map[models.GroupID]nodeStats) {
}
})
}
if len(stats) == 0 {
// If we have no groups/stats add nil group with emitted = 0
stats[models.NilGroup] = nodeStats{
Fields: models.Fields{
"emitted": int64(0),
},
}
}
return
}

Expand Down
12 changes: 9 additions & 3 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,17 @@ func CreatePipeline(script string, sourceEdge EdgeType, scope *tick.Scope, deadm
if err != nil {
return nil, err
}
if sourceEdge == StreamEdge && deadman.Global() {
src.(*SourceStreamNode).Deadman(deadman.Threshold(), deadman.Interval())
if deadman.Global() {
switch s := src.(type) {
case *SourceStreamNode:
s.Deadman(deadman.Threshold(), deadman.Interval())
case *SourceBatchNode:
s.Deadman(deadman.Threshold(), deadman.Interval())
default:
return nil, fmt.Errorf("source edge type must be either Stream or Batch not %s", sourceEdge)
}
}
return p, nil

}

func (p *Pipeline) addSource(src Node) {
Expand Down
5 changes: 4 additions & 1 deletion services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,11 @@ func (r *Service) handleReplay(w http.ResponseWriter, req *http.Request) {
// Drain tm so the task can finish
tm.Drain()

// Stop stats nodes
et.StopStats()

// Check for error on task
err = et.Err()
err = et.Wait()
if err != nil {
httpd.HttpError(w, "task run: "+err.Error(), true, http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion services/task_store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (ts *Service) StartTask(t *kapacitor.Task) error {

go func() {
// Wait for task to finish
err := et.Err()
err := et.Wait()
// Stop task
ts.TaskMaster.StopTask(et.Task.Name)

Expand Down
12 changes: 11 additions & 1 deletion stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kapacitor
import (
"fmt"
"log"
"sync"
"time"

"github.com/influxdata/kapacitor/models"
Expand All @@ -14,6 +15,8 @@ type StatsNode struct {
s *pipeline.StatsNode
en Node
closing chan struct{}
closed bool
mu sync.Mutex
}

// Create a new StreamNode which filters data from a source.
Expand Down Expand Up @@ -44,6 +47,7 @@ func (s *StatsNode) runStats([]byte) error {
for {
select {
case <-s.closing:
s.logger.Println("D! runStats done")
return nil
case now := <-ticker.C:
s.timer.Start()
Expand All @@ -69,5 +73,11 @@ func (s *StatsNode) runStats([]byte) error {
}

func (s *StatsNode) stopStats() {
close(s.closing)
s.logger.Println("D! stopStats")
s.mu.Lock()
defer s.mu.Unlock()
if !s.closed {
s.closed = true
close(s.closing)
}
}
16 changes: 13 additions & 3 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (et *ExecutingTask) stop() (err error) {
close(et.stopping)
et.walk(func(n Node) error {
n.stop()
e := n.Err()
e := n.Wait()
if e != nil {
err = e
}
Expand Down Expand Up @@ -270,10 +270,20 @@ func (et *ExecutingTask) checkDBRPs(batcher *SourceBatchNode) error {
return nil
}

// Stop all stats nodes
func (et *ExecutingTask) StopStats() {
et.walk(func(n Node) error {
if s, ok := n.(*StatsNode); ok {
s.stopStats()
}
return nil
})
}

// Wait till the task finishes and return any error
func (et *ExecutingTask) Err() error {
func (et *ExecutingTask) Wait() error {
return et.rwalk(func(n Node) error {
return n.Err()
return n.Wait()
})
}

Expand Down

0 comments on commit bf76728

Please sign in to comment.