Skip to content

Commit

Permalink
fix(processors): Correctly setup processors (#12081)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored and pull[bot] committed Mar 4, 2024
1 parent 2314386 commit 68d710f
Showing 1 changed file with 45 additions and 45 deletions.
90 changes: 45 additions & 45 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,6 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
}
return fmt.Errorf("undefined but requested processor: %s", name)
}
streamingProcessor := creator()

// For processors with parsers we need to compute the set of
// options that is not covered by both, the parser and the processor.
Expand All @@ -772,51 +771,26 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
return err
}

var processor interface{}
processor = streamingProcessor
if p, ok := streamingProcessor.(unwrappable); ok {
processor = p.Unwrap()
}

// If the (underlying) processor has a SetParser or SetParserFunc function,
// it can accept arbitrary data-formats, so build the requested parser and
// set it.
if t, ok := processor.(telegraf.ParserPlugin); ok {
missCountThreshold = 2
parser, err := c.addParser("processors", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
}

if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
missCountThreshold = 2
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.addParser("processors", name, table)
})
}

// Set up the processor
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
// Setup the processor running before the aggregators
processorBefore, hasParser, err := c.setupProcessor(processorConfig.Name, creator, table)
if err != nil {
return err
}

rf := models.NewRunningProcessor(streamingProcessor, processorConfig)
rf := models.NewRunningProcessor(processorBefore, processorConfig)
c.Processors = append(c.Processors, rf)

// Save a copy for the aggregator
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
// Setup another (new) processor instance running after the aggregator
processorAfter, _, err := c.setupProcessor(processorConfig.Name, creator, table)
if err != nil {
return err
}

rf = models.NewRunningProcessor(streamingProcessor, processorConfig)
rf = models.NewRunningProcessor(processorAfter, processorConfig)
c.AggProcessors = append(c.AggProcessors, rf)

// Check the number of misses against the threshold
if hasParser {
missCountThreshold = 2
}
for key, count := range missCount {
if count <= missCountThreshold {
continue
Expand All @@ -829,20 +803,46 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
return nil
}

func (c *Config) setupProcessorOptions(name string, processor telegraf.StreamingProcessor, table *ast.Table) error {
if p, ok := processor.(unwrappable); ok {
unwrapped := p.Unwrap()
if err := c.toml.UnmarshalTable(table, unwrapped); err != nil {
return fmt.Errorf("unmarshalling unwrappable failed: %w", err)
func (c *Config) setupProcessor(name string, creator processors.StreamingCreator, table *ast.Table) (telegraf.StreamingProcessor, bool, error) {
var hasParser bool

streamingProcessor := creator()

var processor interface{}
if p, ok := streamingProcessor.(unwrappable); ok {
processor = p.Unwrap()
} else {
processor = streamingProcessor
}

// If the (underlying) processor has a SetParser or SetParserFunc function,
// it can accept arbitrary data-formats, so build the requested parser and
// set it.
if t, ok := processor.(telegraf.ParserPlugin); ok {
parser, err := c.addParser("processors", name, table)
if err != nil {
return nil, true, fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
hasParser = true
}

if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
if !c.probeParser(table) {
return nil, false, errors.New("parser not found")
}
return c.printUserDeprecation("processors", name, unwrapped)
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.addParser("processors", name, table)
})
hasParser = true
}

if err := c.toml.UnmarshalTable(table, processor); err != nil {
return fmt.Errorf("unmarshalling failed: %w", err)
return nil, hasParser, fmt.Errorf("unmarshalling failed: %w", err)
}

return c.printUserDeprecation("processors", name, processor)
err := c.printUserDeprecation("processors", name, processor)
return streamingProcessor, hasParser, err
}

func (c *Config) addOutput(name string, table *ast.Table) error {
Expand Down

0 comments on commit 68d710f

Please sign in to comment.