Skip to content

Commit

Permalink
Rework plugin tickers to prevent drift and spread write ticks (influx…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and idohalevi committed Sep 23, 2020
1 parent c549ecf commit af4620b
Show file tree
Hide file tree
Showing 7 changed files with 527 additions and 101 deletions.
99 changes: 34 additions & 65 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,57 +265,45 @@ func (a *Agent) runInputs(
interval = input.Config.Interval
}

var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter)
} else {
ticker = NewUnalignedTicker(interval, jitter)
}
defer ticker.Stop()

acc := NewAccumulator(input, dst)
acc.SetPrecision(a.Precision())

wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()

if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}

a.gatherOnInterval(ctx, acc, input, interval, jitter)
a.gatherLoop(ctx, acc, input, ticker)
}(input)
}
wg.Wait()

wg.Wait()
return nil
}

// gather runs an input's gather function periodically until the context is
// done.
func (a *Agent) gatherOnInterval(
func (a *Agent) gatherLoop(
ctx context.Context,
acc telegraf.Accumulator,
input *models.RunningInput,
interval time.Duration,
jitter time.Duration,
ticker Ticker,
) {
defer panicRecover(input)

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
err := internal.SleepContext(ctx, internal.RandomDuration(jitter))
if err != nil {
return
}

err = a.gatherOnce(acc, input, interval)
if err != nil {
acc.AddError(err)
}

select {
case <-ticker.C:
continue
case <-ticker.Elapsed():
err := a.gatherOnce(acc, input, ticker)
if err != nil {
acc.AddError(err)
}
case <-ctx.Done():
return
}
Expand All @@ -327,11 +315,8 @@ func (a *Agent) gatherOnInterval(
func (a *Agent) gatherOnce(
acc telegraf.Accumulator,
input *models.RunningInput,
timeout time.Duration,
ticker Ticker,
) error {
ticker := time.NewTicker(timeout)
defer ticker.Stop()

done := make(chan error)
go func() {
done <- input.Gather(acc)
Expand All @@ -341,7 +326,7 @@ func (a *Agent) gatherOnce(
select {
case err := <-done:
return err
case <-ticker.C:
case <-ticker.Elapsed():
log.Printf("W! [agent] [%s] did not complete within its interval",
input.LogName())
}
Expand Down Expand Up @@ -514,10 +499,13 @@ func (a *Agent) runOutputs(
jitter = *output.Config.FlushJitter
}

ticker := NewRollingTicker(interval, jitter)
defer ticker.Stop()

wg.Add(1)
go func(output *models.RunningOutput) {
defer wg.Done()
a.flushLoop(ctx, startTime, output, interval, jitter)
a.flushLoop(ctx, output, ticker)
}(output)
}

Expand All @@ -542,10 +530,8 @@ func (a *Agent) runOutputs(
// done.
func (a *Agent) flushLoop(
ctx context.Context,
startTime time.Time,
output *models.RunningOutput,
interval time.Duration,
jitter time.Duration,
ticker Ticker,
) {
logError := func(err error) {
if err != nil {
Expand All @@ -558,44 +544,30 @@ func (a *Agent) flushLoop(
watchForFlushSignal(flushRequested)
defer stopListeningForFlushSignal(flushRequested)

// align to round interval
if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}

// since we are watching two channels we need a ticker with the jitter
// integrated.
ticker := NewTicker(interval, jitter)
defer ticker.Stop()

for {
// Favor shutdown over other methods.
select {
case <-ctx.Done():
logError(a.flushOnce(output, interval, output.Write))
logError(a.flushOnce(output, ticker, output.Write))
return
default:
}

select {
case <-ctx.Done():
logError(a.flushOnce(output, interval, output.Write))
logError(a.flushOnce(output, ticker, output.Write))
return
case <-ticker.C:
logError(a.flushOnce(output, interval, output.Write))
case <-ticker.Elapsed():
logError(a.flushOnce(output, ticker, output.Write))
case <-flushRequested:
logError(a.flushOnce(output, interval, output.Write))
logError(a.flushOnce(output, ticker, output.Write))
case <-output.BatchReady:
// Favor the ticker over batch ready
select {
case <-ticker.C:
logError(a.flushOnce(output, interval, output.Write))
case <-ticker.Elapsed():
logError(a.flushOnce(output, ticker, output.Write))
default:
logError(a.flushOnce(output, interval, output.WriteBatch))
logError(a.flushOnce(output, ticker, output.WriteBatch))
}
}
}
Expand All @@ -605,12 +577,9 @@ func (a *Agent) flushLoop(
// interval it fails to complete before.
func (a *Agent) flushOnce(
output *models.RunningOutput,
timeout time.Duration,
ticker Ticker,
writeFunc func() error,
) error {
ticker := time.NewTicker(timeout)
defer ticker.Stop()

done := make(chan error)
go func() {
done <- writeFunc()
Expand All @@ -621,7 +590,7 @@ func (a *Agent) flushOnce(
case err := <-done:
output.LogBufferStatus()
return err
case <-ticker.C:
case <-ticker.Elapsed():
log.Printf("W! [agent] [%q] did not complete within its flush interval",
output.LogName())
output.LogBufferStatus()
Expand Down
Loading

0 comments on commit af4620b

Please sign in to comment.