Skip to content

Commit

Permalink
make idle barrier time relative to messages
Browse files Browse the repository at this point in the history
addresses influxdata#1710

By having barrier time be based on last (point | barrier) time + idle,
the desired behavior of idle barrier for handling no data (silence)
scenarios works even when the data source clock is offset from the
kapacitor host clock.
  • Loading branch information
sputnik13 committed Dec 5, 2017
1 parent 0a691f0 commit 529c1f7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## unreleased

### Bugfixes

- [#1710](https://github.com/influxdata/kapacitor/issues/1710): Idle Barrier is dropping all messages when source has clock offset

## v1.4.0-rc3 [2017-12-04]

### Bugfixes
Expand Down
50 changes: 29 additions & 21 deletions barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,26 @@ type idleBarrier struct {
name string
group edge.GroupInfo

idle time.Duration
lastT atomic.Value
wg sync.WaitGroup
outs []edge.StatsEdge
stopC chan struct{}
resetTimerC chan struct{}
idle time.Duration
lastPointT atomic.Value
lastBarrierT atomic.Value
wg sync.WaitGroup
outs []edge.StatsEdge
stopC chan struct{}
resetTimerC chan struct{}
}

func newIdleBarrier(name string, group edge.GroupInfo, idle time.Duration, outs []edge.StatsEdge) *idleBarrier {
r := &idleBarrier{
name: name,
group: group,
idle: idle,
lastT: atomic.Value{},
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan struct{}),
resetTimerC: make(chan struct{}),
name: name,
group: group,
idle: idle,
lastPointT: atomic.Value{},
lastBarrierT: atomic.Value{},
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan struct{}),
resetTimerC: make(chan struct{}),
}

r.Init()
Expand All @@ -110,7 +112,8 @@ func newIdleBarrier(name string, group edge.GroupInfo, idle time.Duration, outs
}

func (n *idleBarrier) Init() {
n.lastT.Store(time.Time{})
n.lastPointT.Store(time.Now().UTC())
n.lastBarrierT.Store(time.Time{})
n.wg.Add(1)

go n.idleHandler()
Expand All @@ -125,8 +128,9 @@ func (n *idleBarrier) BeginBatch(m edge.BeginBatchMessage) (edge.Message, error)
return m, nil
}
func (n *idleBarrier) BatchPoint(m edge.BatchPointMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
if !m.Time().Before(n.lastBarrierT.Load().(time.Time)) {
n.resetTimer()
n.lastPointT.Store(m.Time())
return m, nil
}
return nil, nil
Expand All @@ -135,8 +139,10 @@ func (n *idleBarrier) EndBatch(m edge.EndBatchMessage) (edge.Message, error) {
return m, nil
}
func (n *idleBarrier) Barrier(m edge.BarrierMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
if !m.Time().Before(n.lastBarrierT.Load().(time.Time)) {
n.resetTimer()
n.lastPointT.Store(m.Time())
n.lastBarrierT.Store(m.Time())
return m, nil
}
return nil, nil
Expand All @@ -149,8 +155,9 @@ func (n *idleBarrier) DeleteGroup(m edge.DeleteGroupMessage) (edge.Message, erro
}

func (n *idleBarrier) Point(m edge.PointMessage) (edge.Message, error) {
if !m.Time().Before(n.lastT.Load().(time.Time)) {
if !m.Time().Before(n.lastBarrierT.Load().(time.Time)) {
n.resetTimer()
n.lastPointT.Store(m.Time())
return m, nil
}
return nil, nil
Expand All @@ -161,9 +168,10 @@ func (n *idleBarrier) resetTimer() {
}

func (n *idleBarrier) emitBarrier() error {
nowT := time.Now().UTC()
n.lastT.Store(nowT)
return edge.Forward(n.outs, edge.NewBarrierMessage(n.group, nowT))
newT := n.lastPointT.Load().(time.Time).Add(n.idle)
n.lastPointT.Store(newT)
n.lastBarrierT.Store(newT)
return edge.Forward(n.outs, edge.NewBarrierMessage(n.group, newT))
}

func (n *idleBarrier) idleHandler() {
Expand Down
1 change: 0 additions & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,6 @@ stream
defer func() {
cleanupTest()

// barrier should emit at least 4 times
if rc := atomic.LoadInt32(&requestCount); rc != 2 {
t.Errorf("got %v exp %v", rc, 2)
}
Expand Down

0 comments on commit 529c1f7

Please sign in to comment.