Skip to content

Commit

Permalink
Missing commits from the initial merges of the agent -> master
Browse files Browse the repository at this point in the history
  • Loading branch information
ph committed Apr 10, 2020
1 parent ca65a54 commit a179399
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 21 deletions.
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (b *Beat) configure(settings Settings) error {
logp.Info("Beat ID: %v", b.Info.ID)

// initialize config manager
b.ConfigManager, err = management.Factory()(b.Config.Management, reload.Register, b.Beat.Info.ID)
b.ConfigManager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
if err != nil {
return err
}
Expand Down
25 changes: 21 additions & 4 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var Namespace = "libbeat.management"
// DebugK used as key for all things central management
var DebugK = "centralmgmt"

var centralMgmtKey = "x-pack-cm"

// ConfigManager interacts with the beat to update configurations
// from an external source
type ConfigManager interface {
Expand All @@ -47,32 +49,47 @@ type ConfigManager interface {
CheckRawConfig(cfg *common.Config) error
}

// PluginFunc for creating FactoryFunc if it matches a config
type PluginFunc func(*common.Config) FactoryFunc

// FactoryFunc for creating a config manager
type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error)

// Register a config manager
func Register(name string, fn FactoryFunc, stability feature.Stability) {
func Register(name string, fn PluginFunc, stability feature.Stability) {
f := feature.New(Namespace, name, fn, feature.MakeDetails(name, "", stability))
feature.MustRegister(f)
}

// Factory retrieves config manager constructor. If no one is registered
// it will create a nil manager
func Factory() FactoryFunc {
func Factory(cfg *common.Config) FactoryFunc {
factories, err := feature.GlobalRegistry().LookupAll(Namespace)
if err != nil {
return nilFactory
}

for _, f := range factories {
if factory, ok := f.Factory().(FactoryFunc); ok {
return factory
if plugin, ok := f.Factory().(PluginFunc); ok {
if factory := plugin(cfg); factory != nil {
return factory
}
}
}

return nilFactory
}

type modeConfig struct {
Mode string `config:"mode" yaml:"mode"`
}

func defaultModeConfig() *modeConfig {
return &modeConfig{
Mode: centralMgmtKey,
}
}

// nilManager, fallback when no manager is present
type nilManager struct{}

Expand Down
17 changes: 15 additions & 2 deletions x-pack/libbeat/management/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,23 @@ const ManagedConfigTemplate = `
#monitoring.elasticsearch:
`

// Config for central management
const (
// ModeCentralManagement is a default CM mode, using existing processes.
ModeCentralManagement = "x-pack-cm"

// ModeFleet is a management mode where fleet is used to retrieve configurations.
ModeFleet = "x-pack-fleet"
)

// Config for central management.
type Config struct {
// true when enrolled
Enabled bool `config:"enabled" yaml:"enabled"`

// Mode specifies whether beat uses Central Management or Fleet.
// Options: [cm, fleet]
Mode string `config:"mode" yaml:"mode"`

// Poll configs period
Period time.Duration `config:"period" yaml:"period"`

Expand All @@ -93,6 +105,7 @@ type EventReporterConfig struct {

func defaultConfig() *Config {
return &Config{
Mode: ModeCentralManagement,
Period: 60 * time.Second,
EventsReporter: EventReporterConfig{
Period: 30 * time.Second,
Expand All @@ -111,7 +124,7 @@ type templateParams struct {
BeatName string
}

// OverwriteConfigFile will overwrite beat settings file with the enrolled template
// OverwriteConfigFile will overwrite beat settings file with the enrolled template.
func (c *Config) OverwriteConfigFile(wr io.Writer, beatName string) error {
t := template.Must(template.New("beat.management.yml").Parse(ManagedConfigTemplate))

Expand Down
3 changes: 2 additions & 1 deletion x-pack/libbeat/management/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (er *Errors) IsEmpty() bool {
return len(*er) == 0
}

func newConfigError(err error) *Error {
// NewConfigError wraps an error to be a management error of a specific ConfigError Type
func NewConfigError(err error) *Error {
return &Error{Type: ConfigError, Err: err}
}
6 changes: 3 additions & 3 deletions x-pack/libbeat/management/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ func TestErrorSerialization(t *testing.T) {

func TestErrors(t *testing.T) {
t.Run("single error", func(t *testing.T) {
errors := Errors{newConfigError(errors.New("error1"))}
errors := Errors{NewConfigError(errors.New("error1"))}
assert.Equal(t, "1 error: error1", errors.Error())
})

t.Run("multiple errors", func(t *testing.T) {
errors := Errors{
newConfigError(errors.New("error1")),
newConfigError(errors.New("error2")),
NewConfigError(errors.New("error1")),
NewConfigError(errors.New("error2")),
}
assert.Equal(t, "2 errors: error1; error2", errors.Error())
})
Expand Down
27 changes: 27 additions & 0 deletions x-pack/libbeat/management/fleet/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package fleet

import (
xmanagement "github.com/elastic/beats/v7/x-pack/libbeat/management"
)

// Config for central management
type Config struct {
Enabled bool `config:"enabled" yaml:"enabled"`
Mode string `config:"mode" yaml:"mode"`
Blacklist xmanagement.ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"`
}

func defaultConfig() *Config {
return &Config{
Mode: xmanagement.ModeCentralManagement,
Blacklist: xmanagement.ConfigBlacklistSettings{
Patterns: map[string]string{
"output": "console|file",
},
},
}
}
60 changes: 60 additions & 0 deletions x-pack/libbeat/management/fleet/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package fleet

import (
"testing"

"github.com/elastic/beats/v7/libbeat/common"

"github.com/elastic/beats/v7/libbeat/common/reload"
)

func TestConfigBlocks(t *testing.T) {
input := `
filebeat:
inputs:
- type: log
paths:
- /var/log/hello1.log
- /var/log/hello2.log
output:
elasticsearch:
hosts:
- localhost:9200`

var cfg common.MapStr
uconfig, err := common.NewConfigFrom(input)
if err != nil {
t.Fatalf("Config blocks unsuccessfully generated: %+v", err)
}

err = uconfig.Unpack(&cfg)
if err != nil {
t.Fatalf("Config blocks unsuccessfully generated: %+v", err)
}

reg := reload.NewRegistry()
reg.Register("output", &dummyReloadable{})
reg.Register("filebeat.inputs", &dummyReloadable{})

cm := &Manager{
registry: reg,
}
blocks, err := cm.toConfigBlocks(cfg)
if err != nil {
t.Fatalf("Config blocks unsuccessfully generated: %+v", err)
}

if len(blocks) != 2 {
t.Fatalf("Expected 2 block have %d: %+v", len(blocks), blocks)
}
}

type dummyReloadable struct{}

func (dummyReloadable) Reload(config *reload.ConfigWithMeta) error {
return nil
}
32 changes: 32 additions & 0 deletions x-pack/libbeat/management/fleet/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package fleet

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/management"
xmanagement "github.com/elastic/beats/v7/x-pack/libbeat/management"
)

func init() {
management.Register("x-pack-fleet", NewFleetManagerPlugin, feature.Beta)
}

// NewFleetManagerPlugin creates a plugin function returning factory if configuration matches the criteria
func NewFleetManagerPlugin(config *common.Config) management.FactoryFunc {
c := defaultConfig()
if config.Enabled() {
if err := config.Unpack(&c); err != nil {
return nil
}

if c.Mode == xmanagement.ModeFleet {
return NewFleetManager
}
}

return nil
}
15 changes: 5 additions & 10 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/feature"

"github.com/gofrs/uuid"

Expand All @@ -26,10 +25,6 @@ import (

var errEmptyAccessToken = errors.New("access_token is empty, you must reenroll your Beat")

func init() {
management.Register("x-pack", NewConfigManager, feature.Beta)
}

// ConfigManager handles internal config updates. By retrieving
// new configs from Kibana and applying them to the Beat
type ConfigManager struct {
Expand Down Expand Up @@ -284,7 +279,7 @@ func (cm *ConfigManager) reload(t string, blocks []*api.ConfigBlock) *Error {
if len(blocks) > 1 {
err := fmt.Errorf("got an invalid number of configs for %s: %d, expected: 1", t, len(blocks))
cm.logger.Error(err)
return newConfigError(err)
return NewConfigError(err)
}

var config *reload.ConfigWithMeta
Expand All @@ -293,13 +288,13 @@ func (cm *ConfigManager) reload(t string, blocks []*api.ConfigBlock) *Error {
config, err = blocks[0].ConfigWithMeta()
if err != nil {
cm.logger.Error(err)
return newConfigError(err)
return NewConfigError(err)
}
}

if err := obj.Reload(config); err != nil {
cm.logger.Error(err)
return newConfigError(err)
return NewConfigError(err)
}
} else if obj := cm.registry.GetReloadableList(t); obj != nil {
// List
Expand All @@ -308,14 +303,14 @@ func (cm *ConfigManager) reload(t string, blocks []*api.ConfigBlock) *Error {
config, err := block.ConfigWithMeta()
if err != nil {
cm.logger.Error(err)
return newConfigError(err)
return NewConfigError(err)
}
configs = append(configs, config)
}

if err := obj.Reload(configs); err != nil {
cm.logger.Error(err)
return newConfigError(err)
return NewConfigError(err)
}
}

Expand Down
4 changes: 4 additions & 0 deletions x-pack/libbeat/management/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestConfigManager(t *testing.T) {

config := &Config{
Enabled: true,
Mode: ModeCentralManagement,
Period: 100 * time.Millisecond,
Kibana: c,
AccessToken: accessToken,
Expand Down Expand Up @@ -148,6 +149,7 @@ func TestRemoveItems(t *testing.T) {

config := &Config{
Enabled: true,
Mode: ModeCentralManagement,
Period: 100 * time.Millisecond,
Kibana: c,
AccessToken: accessToken,
Expand Down Expand Up @@ -225,6 +227,7 @@ func TestUnEnroll(t *testing.T) {

config := &Config{
Enabled: true,
Mode: ModeCentralManagement,
Period: 100 * time.Millisecond,
Kibana: c,
AccessToken: accessToken,
Expand Down Expand Up @@ -299,6 +302,7 @@ func TestBadConfig(t *testing.T) {

config := &Config{
Enabled: true,
Mode: ModeCentralManagement,
Period: 100 * time.Millisecond,
Kibana: c,
AccessToken: accessToken,
Expand Down
31 changes: 31 additions & 0 deletions x-pack/libbeat/management/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package management

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/management"
)

func init() {
management.Register("x-pack", NewManagerPlugin, feature.Beta)
}

// NewManagerPlugin creates a plugin function returning factory if configuration matches the criteria
func NewManagerPlugin(config *common.Config) management.FactoryFunc {
c := defaultConfig()
if config.Enabled() {
if err := config.Unpack(&c); err != nil {
return nil
}

if c.Mode == ModeCentralManagement {
return NewConfigManager
}
}

return nil
}

0 comments on commit a179399

Please sign in to comment.