Skip to content

Commit

Permalink
feat: Plugin state-persistence (influxdata#12166)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Mar 1, 2023
1 parent e6e22f0 commit f87916a
Show file tree
Hide file tree
Showing 19 changed files with 924 additions and 24 deletions.
98 changes: 96 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"
"errors"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -106,11 +107,23 @@ func (a *Agent) Run(ctx context.Context) error {
a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval))

log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
if err := a.initPlugins(); err != nil {
return err
}

if a.Config.Persister != nil {
log.Printf("D! [agent] Initializing plugin states")
if err := a.initPersister(); err != nil {
return err
}
if err := a.Config.Persister.Load(); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
log.Print("I! [agent] State file does not exist... Skip restoring states...")
}
}

startTime := time.Now()

log.Printf("D! [agent] Connecting outputs")
Expand Down Expand Up @@ -183,6 +196,13 @@ func (a *Agent) Run(ctx context.Context) error {

wg.Wait()

if a.Config.Persister != nil {
log.Printf("D! [agent] Persisting plugin states")
if err := a.Config.Persister.Store(); err != nil {
return err
}
}

log.Printf("D! [agent] Stopped Successfully")
return err
}
Expand Down Expand Up @@ -226,6 +246,80 @@ func (a *Agent) initPlugins() error {
return nil
}

// initPersister initializes the persister and registers the plugins.
func (a *Agent) initPersister() error {
if err := a.Config.Persister.Init(); err != nil {
return err
}

for _, input := range a.Config.Inputs {
plugin, ok := input.Input.(telegraf.StatefulPlugin)
if !ok {
continue
}

name := input.LogName()
id := input.ID()
if err := a.Config.Persister.Register(id, plugin); err != nil {
return fmt.Errorf("could not register input %s: %w", name, err)
}
}

for _, processor := range a.Config.Processors {
plugin, ok := processor.Processor.(telegraf.StatefulPlugin)
if !ok {
continue
}

name := processor.LogName()
id := processor.ID()
if err := a.Config.Persister.Register(id, plugin); err != nil {
return fmt.Errorf("could not register processor %s: %w", name, err)
}
}

for _, aggregator := range a.Config.Aggregators {
plugin, ok := aggregator.Aggregator.(telegraf.StatefulPlugin)
if !ok {
continue
}

name := aggregator.LogName()
id := aggregator.ID()
if err := a.Config.Persister.Register(id, plugin); err != nil {
return fmt.Errorf("could not register aggregator %s: %w", name, err)
}
}

for _, processor := range a.Config.AggProcessors {
plugin, ok := processor.Processor.(telegraf.StatefulPlugin)
if !ok {
continue
}

name := processor.LogName()
id := processor.ID()
if err := a.Config.Persister.Register(id, plugin); err != nil {
return fmt.Errorf("could not register aggregating processor %s: %w", name, err)
}
}

for _, output := range a.Config.Outputs {
plugin, ok := output.Output.(telegraf.StatefulPlugin)
if !ok {
continue
}

name := output.LogName()
id := output.ID()
if err := a.Config.Persister.Register(id, plugin); err != nil {
return fmt.Errorf("could not register output %s: %w", name, err)
}
}

return nil
}

func (a *Agent) startInputs(
dst chan<- telegraf.Metric,
inputs []*models.RunningInput,
Expand Down
6 changes: 6 additions & 0 deletions cmd/telegraf/agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,9 @@
## translates by calling external programs snmptranslate and snmptable,
## or "gosmi" which translates using the built-in gosmi library.
# snmp_translator = "netsnmp"

## Name of the file to load the state of plugins from and store the state to.
## If uncommented and not empty, this file will be used to save the state of
## stateful plugins on termination of Telegraf. If the file exists on start,
## the state in the file will be restored for the plugins.
# statefile = ""
54 changes: 42 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/persister"
"github.com/influxdata/telegraf/plugins/aggregators"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down Expand Up @@ -83,6 +84,8 @@ type Config struct {

Deprecations map[string][]int64
version *semver.Version

Persister *persister.Persister
}

// Ordered plugins used to keep the order in which they appear in a file
Expand Down Expand Up @@ -243,6 +246,12 @@ type AgentConfig struct {
// Method for translating SNMP objects. 'netsnmp' to call external programs,
// 'gosmi' to use the built-in library.
SnmpTranslator string `toml:"snmp_translator"`

// Name of the file to load the state of plugins from and store the state to.
// If uncommented and not empty, this file will be used to save the state of
// stateful plugins on termination of Telegraf. If the file exists on start,
// the state in the file will be restored for the plugins.
Statefile string `toml:"statefile"`
}

// InputNames returns a list of strings of the configured inputs.
Expand Down Expand Up @@ -522,6 +531,13 @@ func (c *Config) LoadConfigData(data []byte) error {
})
}

// Setup the persister if requested
if c.Agent.Statefile != "" {
c.Persister = &persister.Persister{
Filename: c.Agent.Statefile,
}
}

if len(c.UnusedFields) > 0 {
return fmt.Errorf("line %d: configuration specified the fields %q, but they weren't used", tbl.Line, keys(c.UnusedFields))
}
Expand Down Expand Up @@ -962,25 +978,28 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()

processorConfig, err := c.buildProcessor(name, table)
// Setup the processor running before the aggregators
processorBeforeConfig, err := c.buildProcessor("processors", name, table)
if err != nil {
return err
}

// Setup the processor running before the aggregators
processorBefore, hasParser, err := c.setupProcessor(processorConfig.Name, creator, table)
processorBefore, hasParser, err := c.setupProcessor(processorBeforeConfig.Name, creator, table)
if err != nil {
return err
}
rf := models.NewRunningProcessor(processorBefore, processorConfig)
rf := models.NewRunningProcessor(processorBefore, processorBeforeConfig)
c.fileProcessors = append(c.fileProcessors, &OrderedPlugin{table.Line, rf})

// Setup another (new) processor instance running after the aggregator
processorAfter, _, err := c.setupProcessor(processorConfig.Name, creator, table)
processorAfterConfig, err := c.buildProcessor("aggprocessors", name, table)
if err != nil {
return err
}
processorAfter, _, err := c.setupProcessor(processorAfterConfig.Name, creator, table)
if err != nil {
return err
}
rf = models.NewRunningProcessor(processorAfter, processorConfig)
rf = models.NewRunningProcessor(processorAfter, processorAfterConfig)
c.fileAggProcessors = append(c.fileAggProcessors, &OrderedPlugin{table.Line, rf})

// Check the number of misses against the threshold
Expand Down Expand Up @@ -1235,7 +1254,10 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato
if err != nil {
return conf, err
}
return conf, nil

// Generate an ID for the plugin
conf.ID, err = generatePluginID("aggregators."+name, tbl)
return conf, err
}

// buildParser parses Parser specific items from the ast.Table,
Expand All @@ -1256,7 +1278,7 @@ func (c *Config) buildParser(name string, tbl *ast.Table) *models.ParserConfig {
// buildProcessor parses Processor specific items from the ast.Table,
// builds the filter and returns a
// models.ProcessorConfig to be inserted into models.RunningProcessor
func (c *Config) buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
func (c *Config) buildProcessor(category, name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
conf := &models.ProcessorConfig{Name: name}

c.getFieldInt64(tbl, "order", &conf.Order)
Expand All @@ -1271,7 +1293,10 @@ func (c *Config) buildProcessor(name string, tbl *ast.Table) (*models.ProcessorC
if err != nil {
return conf, err
}
return conf, nil

// Generate an ID for the plugin
conf.ID, err = generatePluginID(category+"."+name, tbl)
return conf, err
}

// buildFilter builds a Filter
Expand Down Expand Up @@ -1339,7 +1364,10 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
if err != nil {
return cp, err
}
return cp, nil

// Generate an ID for the plugin
cp.ID, err = generatePluginID("inputs."+name, tbl)
return cp, err
}

// buildSerializer grabs the necessary entries from the ast.Table for creating
Expand Down Expand Up @@ -1427,7 +1455,9 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
return nil, c.firstErr()
}

return oc, nil
// Generate an ID for the plugin
oc.ID, err = generatePluginID("outputs."+name, tbl)
return oc, err
}

func (c *Config) missingTomlField(_ reflect.Type, key string) error {
Expand Down
Loading

0 comments on commit f87916a

Please sign in to comment.