diff --git a/config/config.go b/config/config.go index d6f1a9fa7b7e6..9dde85dc0ffa2 100644 --- a/config/config.go +++ b/config/config.go @@ -754,7 +754,6 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { } return fmt.Errorf("undefined but requested processor: %s", name) } - streamingProcessor := creator() // For processors with parsers we need to compute the set of // options that is not covered by both, the parser and the processor. @@ -772,51 +771,26 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { return err } - var processor interface{} - processor = streamingProcessor - if p, ok := streamingProcessor.(unwrappable); ok { - processor = p.Unwrap() - } - - // If the (underlying) processor has a SetParser or SetParserFunc function, - // it can accept arbitrary data-formats, so build the requested parser and - // set it. - if t, ok := processor.(telegraf.ParserPlugin); ok { - missCountThreshold = 2 - parser, err := c.addParser("processors", name, table) - if err != nil { - return fmt.Errorf("adding parser failed: %w", err) - } - t.SetParser(parser) - } - - if t, ok := processor.(telegraf.ParserFuncPlugin); ok { - missCountThreshold = 2 - if !c.probeParser(table) { - return errors.New("parser not found") - } - t.SetParserFunc(func() (telegraf.Parser, error) { - return c.addParser("processors", name, table) - }) - } - - // Set up the processor - if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil { + // Setup the processor running before the aggregators + processorBefore, hasParser, err := c.setupProcessor(processorConfig.Name, creator, table) + if err != nil { return err } - - rf := models.NewRunningProcessor(streamingProcessor, processorConfig) + rf := models.NewRunningProcessor(processorBefore, processorConfig) c.Processors = append(c.Processors, rf) - // Save a copy for the aggregator - if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil { + // Setup another (new) processor instance running after the aggregator + processorAfter, _, err := c.setupProcessor(processorConfig.Name, creator, table) + if err != nil { return err } - - rf = models.NewRunningProcessor(streamingProcessor, processorConfig) + rf = models.NewRunningProcessor(processorAfter, processorConfig) c.AggProcessors = append(c.AggProcessors, rf) // Check the number of misses against the threshold + if hasParser { + missCountThreshold = 2 + } for key, count := range missCount { if count <= missCountThreshold { continue @@ -829,20 +803,46 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { return nil } -func (c *Config) setupProcessorOptions(name string, processor telegraf.StreamingProcessor, table *ast.Table) error { - if p, ok := processor.(unwrappable); ok { - unwrapped := p.Unwrap() - if err := c.toml.UnmarshalTable(table, unwrapped); err != nil { - return fmt.Errorf("unmarshalling unwrappable failed: %w", err) +func (c *Config) setupProcessor(name string, creator processors.StreamingCreator, table *ast.Table) (telegraf.StreamingProcessor, bool, error) { + var hasParser bool + + streamingProcessor := creator() + + var processor interface{} + if p, ok := streamingProcessor.(unwrappable); ok { + processor = p.Unwrap() + } else { + processor = streamingProcessor + } + + // If the (underlying) processor has a SetParser or SetParserFunc function, + // it can accept arbitrary data-formats, so build the requested parser and + // set it. + if t, ok := processor.(telegraf.ParserPlugin); ok { + parser, err := c.addParser("processors", name, table) + if err != nil { + return nil, true, fmt.Errorf("adding parser failed: %w", err) + } + t.SetParser(parser) + hasParser = true + } + + if t, ok := processor.(telegraf.ParserFuncPlugin); ok { + if !c.probeParser(table) { + return nil, false, errors.New("parser not found") } - return c.printUserDeprecation("processors", name, unwrapped) + t.SetParserFunc(func() (telegraf.Parser, error) { + return c.addParser("processors", name, table) + }) + hasParser = true } if err := c.toml.UnmarshalTable(table, processor); err != nil { - return fmt.Errorf("unmarshalling failed: %w", err) + return nil, hasParser, fmt.Errorf("unmarshalling failed: %w", err) } - return c.printUserDeprecation("processors", name, processor) + err := c.printUserDeprecation("processors", name, processor) + return streamingProcessor, hasParser, err } func (c *Config) addOutput(name string, table *ast.Table) error {