Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Agent] Grpc support for agent to beat communication #14835

Merged
merged 31 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5a8be14
initial
michalpristas Jul 23, 2019
d0612be
yaml parsing
michalpristas Jul 29, 2019
67b7c5d
registering fleet
michalpristas Jul 31, 2019
f555f26
cleanup
michalpristas Jul 31, 2019
99e776b
splitting []iface into []map[string]iface
michalpristas Jul 31, 2019
5ed6102
moved from filebeat to libbeat
michalpristas Aug 1, 2019
03345f1
added vendor
michalpristas Aug 1, 2019
c3814f2
removed binaries
michalpristas Aug 1, 2019
681b637
missing comments
michalpristas Aug 1, 2019
a63f94f
metricbeat spec
michalpristas Aug 2, 2019
f3e0fb2
x-pack/metricbeat supports fleet management
michalpristas Aug 2, 2019
c99a358
global support in x-pack beats for management/fleet
michalpristas Aug 2, 2019
2578888
added submodule
michalpristas Aug 5, 2019
ff0b8cd
start only on run, prevents running on setup or similar commands
michalpristas Aug 5, 2019
2df6934
removed git submodule
michalpristas Nov 28, 2019
ee20f38
fixed fleet imports
michalpristas Nov 28, 2019
69ada7e
removed vendor fleet
michalpristas Nov 28, 2019
0c4d113
fixed tests
michalpristas Nov 28, 2019
990564f
fixed tests
michalpristas Nov 28, 2019
876f9bf
more robust factory method
michalpristas Nov 28, 2019
4bb740f
proper teardown
michalpristas Nov 29, 2019
7524167
resovled conflicts with fleet
michalpristas Dec 2, 2019
dcfa125
resolved conflicts
michalpristas Dec 5, 2019
24ce521
resolved comments
michalpristas Dec 6, 2019
2b6ab97
enabled cm
michalpristas Dec 6, 2019
971189f
set cm
michalpristas Dec 6, 2019
84e6ddc
plugin factory introduced
michalpristas Dec 6, 2019
1c1f341
i'm nothing but a hound dog
michalpristas Dec 6, 2019
86c7ba1
missing files
michalpristas Dec 9, 2019
06894d2
comments
michalpristas Dec 9, 2019
d56f46d
resolved comments
michalpristas Dec 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common/reload"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/management"
"github.com/elastic/beats/libbeat/monitoring"
Expand Down
1 change: 1 addition & 0 deletions filebeat/filebeat.spec
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"BinaryPath":"filebeat","Args":["-e"],"Configurable":"grpc"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move that to YAML spec files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean classic filebeat.yml?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am talking about the files in x-pack/agent/spec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x-pack/agent/spec are transformation rules. this file should describe specific binary.
if we stick to agent.version == beat.version it is fine to have it there. but we probably still need to support {program}.spec for not yet known programs. e.g running endpoint or community beat eventually, so agent knows if specific binary supports grpc protocol or if there are some arguments needed to start binary correctly.
i'm not sure what is a correct way of handling this. maybe lookup local spec + fallback to {program}.spec with local having priority?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the endpoint case, I think we would just have a specific spec for them. Concerning the community beats its a complete other madness and open the door to all sort of problems. We would need a way to add signatures and support the signing of community beats or expose that. :)

For the POC I think we can keep that way but let's create a followup issue to refactor that this sound like a P3 for the priority.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i created a new issue here: #14985

2 changes: 1 addition & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ func (b *Beat) configure(settings Settings) error {
logp.Info("Beat ID: %v", b.Info.ID)

// initialize config manager
b.ConfigManager, err = management.Factory()(b.Config.Management, reload.Register, b.Beat.Info.ID)
b.ConfigManager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
if err != nil {
return err
}
Expand Down
25 changes: 21 additions & 4 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var Namespace = "libbeat.management"
// DebugK used as key for all things central management
var DebugK = "centralmgmt"

var centralMgmtKey = "x-pack-cm"

// ConfigManager interacts with the beat to update configurations
// from an external source
type ConfigManager interface {
Expand All @@ -47,32 +49,47 @@ type ConfigManager interface {
CheckRawConfig(cfg *common.Config) error
}

// PluginFunc for creating FactoryFunc if it matches a config
type PluginFunc func(*common.Config) FactoryFunc

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type PluginFunc should have comment or be unexported

michalpristas marked this conversation as resolved.
Show resolved Hide resolved
michalpristas marked this conversation as resolved.
Show resolved Hide resolved

// FactoryFunc for creating a config manager
type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error)

// Register a config manager
func Register(name string, fn FactoryFunc, stability feature.Stability) {
func Register(name string, fn PluginFunc, stability feature.Stability) {
f := feature.New(Namespace, name, fn, feature.NewDetails(name, "", stability))
feature.MustRegister(f)
}

// Factory retrieves config manager constructor. If no one is registered
// it will create a nil manager
func Factory() FactoryFunc {
func Factory(cfg *common.Config) FactoryFunc {
factories, err := feature.GlobalRegistry().LookupAll(Namespace)
if err != nil {
return nilFactory
}

for _, f := range factories {
if factory, ok := f.Factory().(FactoryFunc); ok {
return factory
if plugin, ok := f.Factory().(PluginFunc); ok {
if factory := plugin(cfg); factory != nil {
return factory
}
}
}

return nilFactory
}

type modeConfig struct {
Mode string `config:"mode" yaml:"mode"`
}

func defaultModeConfig() *modeConfig {
return &modeConfig{
Mode: centralMgmtKey,
}
}

// nilManager, fallback when no manager is present
type nilManager struct{}

Expand Down
4 changes: 0 additions & 4 deletions x-pack/agent/pkg/agent/configrequest/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ const (
StepRun = "sc-run"
// StepRemove is a name of Remove program event causing beat in version to be uninstalled
StepRemove = "sc-remove"
// StepStartSidecar is a name of start program monitoring event
StepStartSidecar = "sc-sidecar-start"
// StepStopSidecar is a name of stop program monitoring event
StepStopSidecar = "sc-sidecar-stop"

// MetaConfigKey is key used to store configuration in metadata
MetaConfigKey = "config"
Expand Down
23 changes: 15 additions & 8 deletions x-pack/agent/pkg/agent/operation/operator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ func (o *Operator) initHandlerMap() {

hm[configrequest.StepRun] = o.handleRun
hm[configrequest.StepRemove] = o.handleRemove
hm[configrequest.StepStartSidecar] = o.handleStartSidecar
hm[configrequest.StepStopSidecar] = o.handleStopSidecar

o.handlers = hm
}
Expand All @@ -46,7 +44,7 @@ func (o *Operator) handleRemove(step configrequest.Step) error {

p, _, err := getProgramFromStep(step)
if err != nil {
return errors.Wrap(err, "operator.handleStart failed to create program")
return errors.Wrap(err, "operator.handleRemove failed to stop program")
}

return o.stop(p)
Expand All @@ -67,15 +65,24 @@ func getProgramFromStepWithTags(step configrequest.Step, tags map[app.Tag]string
}

func getConfigFromStep(step configrequest.Step) (map[string]interface{}, error) {
metConfig, ok := step.Meta[configrequest.MetaConfigKey]
if !ok {
metConfig, hasConfig := step.Meta[configrequest.MetaConfigKey]

if !hasConfig && needsMetaConfig(step) {
return nil, fmt.Errorf("step: %s, no config in metadata", step.ID)
}

config, ok := metConfig.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("step: %s, program config is in invalid format", step.ID)
var config map[string]interface{}
if hasConfig {
var ok bool
config, ok = metConfig.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("step: %s, program config is in invalid format", step.ID)
}
}

return config, nil
}

func needsMetaConfig(step configrequest.Step) bool {
return step.ID == configrequest.StepRun
}
michalpristas marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions x-pack/filebeat/filebeat.spec
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"BinaryPath":"filebeat","Args":["-e"],"Configurable":"grpc"}
5 changes: 5 additions & 0 deletions x-pack/libbeat/cmd/inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ import (

// register central management
"github.com/elastic/beats/x-pack/libbeat/licenser"

// Register Central Management
_ "github.com/elastic/beats/x-pack/libbeat/management"

// Register fleet
_ "github.com/elastic/beats/x-pack/libbeat/management/fleet"

// register autodiscover providers
_ "github.com/elastic/beats/x-pack/libbeat/autodiscover/providers/aws/elb"
)
Expand Down
17 changes: 15 additions & 2 deletions x-pack/libbeat/management/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,23 @@ const ManagedConfigTemplate = `
#monitoring.elasticsearch:
`

// Config for central management
const (
// ModeCentralManagement is a default CM mode, using existing processes.
ModeCentralManagement = "x-pack-cm"

// ModeFleet is a management mode where fleet is used to retrieve configurations.
ModeFleet = "x-pack-fleet"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add a period after the sentence.

)

michalpristas marked this conversation as resolved.
Show resolved Hide resolved
// Config for central management.
type Config struct {
// true when enrolled
Enabled bool `config:"enabled" yaml:"enabled"`

// Mode specifies whether beat uses Central Management or Fleet.
// Options: [cm, fleet]
Mode string `config:"mode" yaml:"mode"`

// Poll configs period
Period time.Duration `config:"period" yaml:"period"`

Expand All @@ -93,6 +105,7 @@ type EventReporterConfig struct {

func defaultConfig() *Config {
return &Config{
Mode: ModeCentralManagement,
Period: 60 * time.Second,
EventsReporter: EventReporterConfig{
Period: 30 * time.Second,
Expand All @@ -111,7 +124,7 @@ type templateParams struct {
BeatName string
}

// OverwriteConfigFile will overwrite beat settings file with the enrolled template
// OverwriteConfigFile will overwrite beat settings file with the enrolled template.
func (c *Config) OverwriteConfigFile(wr io.Writer, beatName string) error {
t := template.Must(template.New("beat.management.yml").Parse(ManagedConfigTemplate))

Expand Down
3 changes: 2 additions & 1 deletion x-pack/libbeat/management/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (er *Errors) IsEmpty() bool {
return len(*er) == 0
}

func newConfigError(err error) *Error {
// NewConfigError wraps an error to be a management error of a specific ConfigError Type
func NewConfigError(err error) *Error {
return &Error{Type: ConfigError, Err: err}
}
6 changes: 3 additions & 3 deletions x-pack/libbeat/management/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ func TestErrorSerialization(t *testing.T) {

func TestErrors(t *testing.T) {
t.Run("single error", func(t *testing.T) {
errors := Errors{newConfigError(errors.New("error1"))}
errors := Errors{NewConfigError(errors.New("error1"))}
assert.Equal(t, "1 error: error1", errors.Error())
})

t.Run("multiple errors", func(t *testing.T) {
errors := Errors{
newConfigError(errors.New("error1")),
newConfigError(errors.New("error2")),
NewConfigError(errors.New("error1")),
NewConfigError(errors.New("error2")),
}
assert.Equal(t, "2 errors: error1; error2", errors.Error())
})
Expand Down
27 changes: 27 additions & 0 deletions x-pack/libbeat/management/fleet/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
ph marked this conversation as resolved.
Show resolved Hide resolved
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package fleet

import (
xmanagement "github.com/elastic/beats/x-pack/libbeat/management"
)

// Config for central management
type Config struct {
Enabled bool `config:"enabled" yaml:"enabled"`
Mode string `config:"mode" yaml:"mode"`
Blacklist xmanagement.ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"`
}

func defaultConfig() *Config {
return &Config{
Mode: xmanagement.ModeCentralManagement,
Blacklist: xmanagement.ConfigBlacklistSettings{
Patterns: map[string]string{
"output": "console|file",
},
},
}
}
michalpristas marked this conversation as resolved.
Show resolved Hide resolved
56 changes: 56 additions & 0 deletions x-pack/libbeat/management/fleet/config_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package fleet

import (
"context"
"errors"
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/x-pack/agent/pkg/core/remoteconfig/grpc"
)

const (
defaultTimeout = 15 * time.Second
)

// Server is a server for handling communication between
// beat and Elastic Agent.
type Server struct {
configChan chan<- map[string]interface{}
}

// NewConfigServer creates a new grpc configuration server for receiving
// configurations from Elastic Agent.
func NewConfigServer(configChan chan<- map[string]interface{}) *Server {
return &Server{
configChan: configChan,
}
}

// Config is a handler of a call made by agent pushing latest configuration.
func (s *Server) Config(ctx context.Context, req *grpc.ConfigRequest) (*grpc.ConfigResponse, error) {
cfgString := req.GetConfig()

var configMap common.MapStr
uconfig, err := common.NewConfigFrom(cfgString)
if err != nil {
return &grpc.ConfigResponse{}, fmt.Errorf("config blocks unsuccessfully generated: %+v", err)
}

err = uconfig.Unpack(&configMap)
if err != nil {
return &grpc.ConfigResponse{}, fmt.Errorf("config blocks unsuccessfully generated: %+v", err)
}

select {
case s.configChan <- configMap:
case <-time.After(defaultTimeout):
return &grpc.ConfigResponse{}, errors.New("failed to push configuration: Timeout")
}
return &grpc.ConfigResponse{}, nil
}
Loading