Skip to content

Commit

Permalink
fix(regression): Fixes problem with metrics not exposed by plugins. (i…
Browse files Browse the repository at this point in the history
  • Loading branch information
zak-pawel authored Oct 14, 2022
1 parent 90d8f42 commit ab293e8
Showing 1 changed file with 47 additions and 44 deletions.
91 changes: 47 additions & 44 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,60 +280,57 @@ func (a *Agent) runInputs(
unit *inputUnit,
) {
var wg sync.WaitGroup
tickers := make([]Ticker, len(unit.inputs))
for _, input := range unit.inputs {
a.runInput(ctx, startTime, unit, input, &wg)
}

wg.Wait()
// Overwrite agent interval if this plugin has its own.
interval := time.Duration(a.Config.Agent.Interval)
if input.Config.Interval != 0 {
interval = input.Config.Interval
}

log.Printf("D! [agent] Stopping service inputs")
stopServiceInputs(unit.inputs)
// Overwrite agent precision if this plugin has its own.
precision := time.Duration(a.Config.Agent.Precision)
if input.Config.Precision != 0 {
precision = input.Config.Precision
}

close(unit.dst)
log.Printf("D! [agent] Input channel closed")
}
// Overwrite agent collection_jitter if this plugin has its own.
jitter := time.Duration(a.Config.Agent.CollectionJitter)
if input.Config.CollectionJitter != 0 {
jitter = input.Config.CollectionJitter
}

func (a *Agent) runInput(ctx context.Context, startTime time.Time, unit *inputUnit, input *models.RunningInput, wg *sync.WaitGroup) {
// Overwrite agent interval if this plugin has its own.
interval := time.Duration(a.Config.Agent.Interval)
if input.Config.Interval != 0 {
interval = input.Config.Interval
}
// Overwrite agent collection_offset if this plugin has its own.
offset := time.Duration(a.Config.Agent.CollectionOffset)
if input.Config.CollectionOffset != 0 {
offset = input.Config.CollectionOffset
}

// Overwrite agent precision if this plugin has its own.
precision := time.Duration(a.Config.Agent.Precision)
if input.Config.Precision != 0 {
precision = input.Config.Precision
}
var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter, offset)
} else {
ticker = NewUnalignedTicker(interval, jitter, offset)
}
tickers = append(tickers, ticker)

// Overwrite agent collection_jitter if this plugin has its own.
jitter := time.Duration(a.Config.Agent.CollectionJitter)
if input.Config.CollectionJitter != 0 {
jitter = input.Config.CollectionJitter
}
acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(getPrecision(precision, interval))

// Overwrite agent collection_offset if this plugin has its own.
offset := time.Duration(a.Config.Agent.CollectionOffset)
if input.Config.CollectionOffset != 0 {
offset = input.Config.CollectionOffset
}

var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter, offset)
} else {
ticker = NewUnalignedTicker(interval, jitter, offset)
wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()
a.gatherLoop(ctx, acc, input, ticker, interval)
}(input)
}
defer ticker.Stop()
defer stopTickers(tickers)
wg.Wait()

acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(getPrecision(precision, interval))
log.Printf("D! [agent] Stopping service inputs")
stopServiceInputs(unit.inputs)

wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()
a.gatherLoop(ctx, acc, input, ticker, interval)
}(input)
close(unit.dst)
log.Printf("D! [agent] Input channel closed")
}

// testStartInputs is a variation of startInputs for use in --test and --once
Expand Down Expand Up @@ -1110,3 +1107,9 @@ func panicRecover(input *models.RunningInput) {
"https://github.com/influxdata/telegraf/issues/new/choose")
}
}

func stopTickers(tickers []Ticker) {
for _, ticker := range tickers {
ticker.Stop()
}
}

0 comments on commit ab293e8

Please sign in to comment.