Skip to content

Commit

Permalink
Allow overriding the collection_jitter and precision per input (influ…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and rhajek committed Jul 13, 2020
1 parent fd793e2 commit 25aa6e8
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 95 deletions.
62 changes: 46 additions & 16 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,20 @@ func (a *Agent) startInputs(

for _, input := range inputs {
if si, ok := input.Input.(telegraf.ServiceInput); ok {
// Service input plugins are not subject to timestamp rounding.
// Service input plugins are not normally subject to timestamp
// rounding except for when precision is set on the input plugin.
//
// This only applies to the accumulator passed to Start(), the
// Gather() accumulator does apply rounding according to the
// precision agent setting.
// precision and interval agent/plugin settings.
var interval time.Duration
var precision time.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
}

acc := NewAccumulator(input, dst)
acc.SetPrecision(time.Nanosecond)
acc.SetPrecision(getPrecision(precision, interval))

err := si.Start(acc)
if err != nil {
Expand All @@ -276,14 +284,24 @@ func (a *Agent) runInputs(
) error {
var wg sync.WaitGroup
for _, input := range unit.inputs {
interval := a.Config.Agent.Interval.Duration
jitter := a.Config.Agent.CollectionJitter.Duration

// Overwrite agent interval if this plugin has its own.
interval := a.Config.Agent.Interval.Duration
if input.Config.Interval != 0 {
interval = input.Config.Interval
}

// Overwrite agent precision if this plugin has its own.
precision := a.Config.Agent.Precision.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
}

// Overwrite agent collection_jitter if this plugin has its own.
jitter := a.Config.Agent.CollectionJitter.Duration
if input.Config.CollectionJitter != 0 {
jitter = input.Config.CollectionJitter
}

var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter)
Expand All @@ -293,7 +311,7 @@ func (a *Agent) runInputs(
defer ticker.Stop()

acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(a.Precision())
acc.SetPrecision(getPrecision(precision, interval))

wg.Add(1)
go func(input *models.RunningInput) {
Expand Down Expand Up @@ -368,12 +386,24 @@ func (a *Agent) testRunInputs(
go func(input *models.RunningInput) {
defer wg.Done()

// Overwrite agent interval if this plugin has its own.
interval := a.Config.Agent.Interval.Duration
if input.Config.Interval != 0 {
interval = input.Config.Interval
}

// Overwrite agent precision if this plugin has its own.
precision := a.Config.Agent.Precision.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
}

// Run plugins that require multiple gathers to calculate rate
// and delta metrics twice.
switch input.Config.Name {
case "cpu", "mongodb", "procstat":
nulAcc := NewAccumulator(input, nul)
nulAcc.SetPrecision(a.Precision())
nulAcc.SetPrecision(getPrecision(precision, interval))
if err := input.Input.Gather(nulAcc); err != nil {
nulAcc.AddError(err)
}
Expand All @@ -382,7 +412,7 @@ func (a *Agent) testRunInputs(
}

acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(a.Precision())
acc.SetPrecision(getPrecision(precision, interval))

if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
Expand Down Expand Up @@ -580,8 +610,11 @@ func (a *Agent) runAggregators(
go func(agg *models.RunningAggregator) {
defer wg.Done()

interval := a.Config.Agent.Interval.Duration
precision := a.Config.Agent.Precision.Duration

acc := NewAccumulator(agg, unit.aggC)
acc.SetPrecision(a.Precision())
acc.SetPrecision(getPrecision(precision, interval))
a.push(ctx, agg, acc)
}(agg)
}
Expand Down Expand Up @@ -705,8 +738,8 @@ func (a *Agent) runOutputs(

jitter := jitter
// Overwrite agent flush_jitter if this plugin has its own.
if output.Config.FlushJitter != nil {
jitter = *output.Config.FlushJitter
if output.Config.FlushJitter != 0 {
jitter = output.Config.FlushJitter
}

wg.Add(1)
Expand Down Expand Up @@ -1063,10 +1096,7 @@ func (a *Agent) once(ctx context.Context, wait time.Duration) error {
}

// Returns the rounding precision for metrics.
func (a *Agent) Precision() time.Duration {
precision := a.Config.Agent.Precision.Duration
interval := a.Config.Agent.Interval.Duration

func getPrecision(precision, interval time.Duration) time.Duration {
if precision > 0 {
return precision
}
Expand Down
108 changes: 37 additions & 71 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,44 +1075,18 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
Grace: time.Second * 0,
}

if node, ok := tbl.Fields["period"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

conf.Period = dur
}
}
if err := getConfigDuration(tbl, "period", &conf.Period); err != nil {
return nil, err
}

if node, ok := tbl.Fields["delay"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

conf.Delay = dur
}
}
if err := getConfigDuration(tbl, "delay", &conf.Delay); err != nil {
return nil, err
}

if node, ok := tbl.Fields["grace"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

conf.Grace = dur
}
}
if err := getConfigDuration(tbl, "grace", &conf.Grace); err != nil {
return nil, err
}

if node, ok := tbl.Fields["drop_original"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
Expand Down Expand Up @@ -1166,9 +1140,6 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}

delete(tbl.Fields, "period")
delete(tbl.Fields, "delay")
delete(tbl.Fields, "grace")
delete(tbl.Fields, "drop_original")
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
Expand Down Expand Up @@ -1361,17 +1332,17 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) {
// models.InputConfig to be inserted into models.RunningInput
func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
cp := &models.InputConfig{Name: name}
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

cp.Interval = dur
}
}
if err := getConfigDuration(tbl, "interval", &cp.Interval); err != nil {
return nil, err
}

if err := getConfigDuration(tbl, "precision", &cp.Precision); err != nil {
return nil, err
}

if err := getConfigDuration(tbl, "collection_jitter", &cp.CollectionJitter); err != nil {
return nil, err
}

if node, ok := tbl.Fields["name_prefix"]; ok {
Expand Down Expand Up @@ -1419,7 +1390,6 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "alias")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tags")
var err error
cp.Filter, err = buildFilter(tbl)
Expand Down Expand Up @@ -2141,30 +2111,12 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
oc.Filter.NamePass = oc.Filter.FieldPass
}

if node, ok := tbl.Fields["flush_interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

oc.FlushInterval = dur
}
}
if err := getConfigDuration(tbl, "flush_interval", &oc.FlushInterval); err != nil {
return nil, err
}

if node, ok := tbl.Fields["flush_jitter"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
oc.FlushJitter = new(time.Duration)
*oc.FlushJitter = dur
}
}
if err := getConfigDuration(tbl, "flush_jitter", &oc.FlushJitter); err != nil {
return nil, err
}

if node, ok := tbl.Fields["metric_buffer_limit"]; ok {
Expand Down Expand Up @@ -2223,8 +2175,6 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
}
}

delete(tbl.Fields, "flush_interval")
delete(tbl.Fields, "flush_jitter")
delete(tbl.Fields, "metric_buffer_limit")
delete(tbl.Fields, "metric_batch_size")
delete(tbl.Fields, "alias")
Expand All @@ -2241,3 +2191,19 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
type unwrappable interface {
Unwrap() telegraf.Processor
}

func getConfigDuration(tbl *ast.Table, key string, target *time.Duration) error {
if node, ok := tbl.Fields[key]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
d, err := time.ParseDuration(str.Value)
if err != nil {
return err
}
delete(tbl.Fields, key)
*target = d
}
}
}
return nil
}
26 changes: 22 additions & 4 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ The agent table configures Telegraf and the defaults used across all plugins.
running a large number of telegraf instances. ie, a jitter of 5s and interval
10s means flushes will happen every 10-15s.


- **precision**:
Collected metrics are rounded to the precision specified as an [interval][].

Expand Down Expand Up @@ -194,13 +193,32 @@ driven operation.
Parameters that can be used with any input plugin:

- **alias**: Name an instance of a plugin.
- **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular input should be run less or more
often, you can configure that here.

- **interval**:
Overrides the `interval` setting of the [agent][Agent] for the plugin. How
often to gather this metric. Normal plugins use a single global interval, but
if one particular input should be run less or more often, you can configure
that here.

- **precision**:
Overrides the `precision` setting of the [agent][Agent] for the plugin.
Collected metrics are rounded to the precision specified as an [interval][].

When this value is set on a service input, multiple events occuring at the
same timestamp may be merged by the output database.

- **collection_jitter**:
Overrides the `collection_jitter` setting of the [agent][Agent] for the
plugin. Collection jitter is used to jitter the collection by a random
[interval][].

- **name_override**: Override the base name of the measurement. (Default is
the name of the input).

- **name_prefix**: Specifies a prefix to attach to the measurement name.

- **name_suffix**: Specifies a suffix to attach to the measurement name.

- **tags**: A map of tags to apply to a specific input's measurements.

The [metric filtering][] parameters can be used to limit what metrics are
Expand Down
8 changes: 5 additions & 3 deletions models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {

// InputConfig is the common config for all inputs.
type InputConfig struct {
Name string
Alias string
Interval time.Duration
Name string
Alias string
Interval time.Duration
CollectionJitter time.Duration
Precision time.Duration

NameOverride string
MeasurementPrefix string
Expand Down
2 changes: 1 addition & 1 deletion models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type OutputConfig struct {
Filter Filter

FlushInterval time.Duration
FlushJitter *time.Duration
FlushJitter time.Duration
MetricBufferLimit int
MetricBatchSize int

Expand Down

0 comments on commit 25aa6e8

Please sign in to comment.