Skip to content

Commit

Permalink
Allow stateful processors only in global config
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoriano committed Feb 26, 2020
1 parent da3584d commit f51c3a8
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Variable substitution from environment variables is not longer supported. {pull}15937{15937}
- Change aws_elb autodiscover provider field name from elb_listener.* to aws.elb.*. {issue}16219[16219] {pull}16402{16402}
- Remove `AddDockerMetadata` and `AddKubernetesMetadata` processors from the `script` processor. They can still be used as normal processors in the configuration. {issue}16349[16349] {pull}16514[16514]
- Allow the use of add_docker_metadata and add_kubernetes_metadata processors in global configuration only. {issue}16349[16349] {pull}16653[16653]

*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
var processCgroupPaths = cgroup.ProcessCgroupPaths

func init() {
processors.RegisterPlugin(processorName, New)
processors.RegisterStatefulPlugin(processorName, New)
}

type addDockerMetadata struct {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type kubernetesAnnotator struct {
}

func init() {
processors.RegisterPlugin("add_kubernetes_metadata", New)
processors.RegisterStatefulPlugin("add_kubernetes_metadata", New)

// Register default indexers
Indexing.AddIndexer(PodNameIndexerName, NewPodNameIndexer)
Expand Down
12 changes: 11 additions & 1 deletion libbeat/processors/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ func NewList(log *logp.Logger) *Processors {

// New creates a list of processors from a list of free user configurations.
func New(config PluginConfig) (*Processors, error) {
return newFromRegistry(registry, config)
}

// NewStateful creates a list of processors from a list of free user configurations.
// These processors may need to be closed once they are not used anymore.
func NewStateful(config PluginConfig) (*Processors, error) {
return newFromRegistry(statefulRegistry, config)
}

func newFromRegistry(registry *Namespace, config PluginConfig) (*Processors, error) {
procs := NewList(nil)

for _, procConfig := range config {
Expand Down Expand Up @@ -84,7 +94,7 @@ func New(config PluginConfig) (*Processors, error) {
validActions = append(validActions, k)

}
return nil, errors.Errorf("the processor action %s does not exist. Valid actions: %v", actionName, strings.Join(validActions, ", "))
return nil, errors.Errorf("the processor action %s is not available in this context. Valid actions: %v", actionName, strings.Join(validActions, ", "))
}

actionCfg.PrintDebugf("Configure processor action '%v' with:", actionName)
Expand Down
17 changes: 17 additions & 0 deletions libbeat/processors/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,29 @@ func init() {
type Constructor func(config *common.Config) (Processor, error)

var registry = NewNamespace()
var statefulRegistry = NewNamespace()

// RegisterPlugin register a stateless processor
func RegisterPlugin(name string, constructor Constructor) {
logp.L().Named(logName).Debugf("Register plugin %s", name)

err := registry.Register(name, constructor)
if err != nil {
panic(err)
}

err = statefulRegistry.Register(name, constructor)
if err != nil {
panic(err)
}
}

// RegisterPlugin register processor that needs to be closed
func RegisterStatefulPlugin(name string, constructor Constructor) {
logp.L().Named(logName).Debugf("Register plugin %s", name)

err := statefulRegistry.Register(name, constructor)
if err != nil {
panic(err)
}
}
2 changes: 1 addition & 1 deletion libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func MakeDefaultSupport(
return nil, err
}

processors, err := processors.New(cfg.Processors)
processors, err := processors.NewStateful(cfg.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
}
Expand Down

0 comments on commit f51c3a8

Please sign in to comment.