Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

Update the shipper to work with the elastic-agent #185

Merged
merged 31 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
21c8fb2
handle output config, refactor
fearful-symmetry Nov 17, 2022
a02055b
clean up, tinkering with inputs
fearful-symmetry Nov 17, 2022
2c68e8e
Merge remote-tracking branch 'upstream/main' into setup-agent-config
fearful-symmetry Nov 17, 2022
486b66a
update go deps
fearful-symmetry Nov 18, 2022
e3d16f6
linter
fearful-symmetry Nov 18, 2022
ef4772c
add headers
fearful-symmetry Nov 18, 2022
9746559
update notice
fearful-symmetry Nov 18, 2022
fe7e4f8
continue to sync shipper with agent
fearful-symmetry Jan 19, 2023
97d37f8
Merge remote-tracking branch 'upstream/main' into setup-agent-config
fearful-symmetry Jan 19, 2023
4d17c78
try to make linter happy
fearful-symmetry Jan 23, 2023
a81454c
fix tests
fearful-symmetry Jan 23, 2023
9d1a866
add more comments, logic for unit state
fearful-symmetry Jan 24, 2023
2e269fe
update deps, clean up logging
fearful-symmetry Jan 25, 2023
2a4f57d
still tinkering with logging
fearful-symmetry Jan 25, 2023
a3dc3de
refactor to split input and output
fearful-symmetry Jan 27, 2023
28e9793
update deps
fearful-symmetry Jan 28, 2023
a13484c
update notice
fearful-symmetry Jan 28, 2023
446fec0
fix header
fearful-symmetry Jan 30, 2023
f197d39
change config unpacking
fearful-symmetry Feb 1, 2023
bcc639b
update deps
fearful-symmetry Feb 1, 2023
9e78d98
update notice
fearful-symmetry Feb 1, 2023
7de6b9b
change comments, adjust unit map behavior, add tests
fearful-symmetry Feb 2, 2023
9aa0b34
lint
fearful-symmetry Feb 2, 2023
35b9107
add tests, change unit map behavior
fearful-symmetry Feb 3, 2023
bf5fd83
tinker with tests
fearful-symmetry Feb 6, 2023
b76df30
tinkering with tests, still
fearful-symmetry Feb 6, 2023
4d75339
clean up
fearful-symmetry Feb 6, 2023
e69f074
add tests
fearful-symmetry Feb 6, 2023
5a5f835
add more tests
fearful-symmetry Feb 7, 2023
511c999
add comments, rename unit methods
fearful-symmetry Feb 8, 2023
5fc5662
change log change logic
fearful-symmetry Feb 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,060 changes: 2,030 additions & 2,030 deletions NOTICE.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func NewCommand() *cobra.Command {
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("d"))
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("environment"))
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("c"))
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("E"))

run := runCmd()

Expand Down
172 changes: 123 additions & 49 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,99 @@ import (
"errors"
"flag"
"fmt"
"io/ioutil"
"os"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-ucfg/json"

"github.com/elastic/elastic-agent-shipper/monitoring"
"github.com/elastic/elastic-agent-shipper/output"
"github.com/elastic/elastic-agent-shipper/queue"
"github.com/elastic/elastic-agent-shipper/server"

"go.uber.org/zap/zapcore"
)

var (
configFilePath string
configFilePath string
// ErrConfigIsNotSet reports that no unmanaged config file has been set
ErrConfigIsNotSet = errors.New("config file is not set")
fs = flag.CommandLine
// Overwrites is the config map of CLI overwrites set by the -E flag
Overwrites = config.SettingFlag(fs, "E", "config overwrites")

esKey = "elasticsearch"
consoleKey = "console"
kafaKey = "kafka"
)

// A lot of the code here is the same as what's in elastic-agent, but it lives in an internal/ library
func init() {
fs := flag.CommandLine
fs.StringVar(&configFilePath, "c", "", "Run the shipper in the unmanaged mode and use the given configuration file instead")

}

// ShipperConnectionConfig is the shipper-relevant portion of the config received from input units
type ShipperConnectionConfig struct {
Server string `config:"server"`
TLS ShipperTLS `config:"ssl"`
}

// ShipperTLS is TLS-specific shipper client settings
type ShipperTLS struct {
CAs []string `config:"certificate_authorities"`
Cert string `config:"certificate"`
Key string `config:"key"`
}

// ShipperConfig defines the options present in the config file
// ShipperRootConfig defines the shipper config we get from elastic-agent's output unit
type ShipperRootConfig struct {
Type string `config:"type"`
Shipper ShipperConfig `config:"shipper"`
}

// ShipperConfig defines the config values stored under the `shipper` key in the fleet config
type ShipperConfig struct {
Log logp.Config `config:"logging"`
Monitor monitoring.Config `config:"monitoring"` //Queue monitoring settings
Queue queue.Config `config:"queue"` //Queue settings
Server server.Config `config:"server"` //gRPC Server settings
Output output.Config `config:"output"` //Output settings
// StrictMode means that every incoming event will be validated against the
// list of required fields. This introduces some additional overhead but can
// be really handy for client developers on the debugging stage.
// Normally, it should be disabled during production use and enabled for testing.
// In production it is preferable to send events to the output if at all possible.
StrictMode bool `config:"strict_mode"`
Server ShipperConnectionConfig // server settings, set by the input unit
}

// DefaultConfig returns a default config for the shipper
func DefaultConfig() ShipperRootConfig {
return ShipperRootConfig{
Type: esKey,
Shipper: ShipperConfig{
Monitor: monitoring.DefaultConfig(),
Queue: queue.DefaultConfig(),
},
}
}

// ReadConfigFromFile returns the populated config from the specified path
func ReadConfigFromFile() (ShipperConfig, error) {
func ReadConfigFromFile() (ShipperRootConfig, error) {
if configFilePath == "" {
return ShipperConfig{}, ErrConfigIsNotSet
return ShipperRootConfig{}, ErrConfigIsNotSet
}
contents, err := ioutil.ReadFile(configFilePath)
contents, err := os.ReadFile(configFilePath)
if err != nil {
return ShipperConfig{}, fmt.Errorf("error reading input file %s: %w", configFilePath, err)
return ShipperRootConfig{}, fmt.Errorf("error reading input file %s: %w", configFilePath, err)
}

raw, err := config.NewConfigWithYAML(contents, "")
if err != nil {
return ShipperConfig{}, fmt.Errorf("error reading config from yaml: %w", err)
return ShipperRootConfig{}, fmt.Errorf("error reading config from yaml: %w", err)
}

unpacker := func(cfg *ShipperConfig) error {
unpacker := func(cfg *ShipperRootConfig) error {
return raw.Unpack(cfg)
}

Expand All @@ -66,56 +109,87 @@ func ReadConfigFromFile() (ShipperConfig, error) {

// ShipperConfigFromUnitConfig converts the configuration provided by Agent to the internal
// configuration object used by the shipper.
// Currently this just converts the given struct to json and tries to deserialize it into the
// ShipperConfig struct. This is not the right way to do this, but this gets the build and
// tests passing again with the new version of elastic-agent-client. Migrating fully to this
// new config structure is part of the overall agent V2 transition, for more details see
// https://github.com/elastic/elastic-agent/issues/617.
func ShipperConfigFromUnitConfig(logLevel client.UnitLogLevel, config *proto.UnitExpectedConfig) (ShipperConfig, error) {
jsonConfig, err := config.GetSource().MarshalJSON()
if err != nil {
return ShipperConfig{}, err
func ShipperConfigFromUnitConfig(level client.UnitLogLevel, rawConfig *proto.UnitExpectedConfig) (ShipperRootConfig, error) {
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
cfgObject := DefaultConfig()

// set config based on the lowest log level
zapLevel := ZapFromUnitLogLevel(level)

if logp.GetLevel() != zapLevel {
logp.L().Debugf("Got new log level: %s", level.String())
logp.SetLevel(zapLevel)
}
return ReadConfigFromJSON(string(jsonConfig))
}

// ReadConfigFromJSON reads the event in from a JSON config. I believe @blakerouse told me
// that the V2 controller will send events via JSON, but I could be wrong.
func ReadConfigFromJSON(raw string) (ShipperConfig, error) {
rawCfg, err := json.NewConfig([]byte(raw))
// Generate basic config object from the source
// I would prefer to use the mapstructure library here,
// since it's more ergonomic, but we import a bunch of other structs
// into this config, all of which use our own `config` struct tag.
mapCfg := rawConfig.GetSource().AsMap()
cfg, err := config.NewConfigFrom(mapCfg)
if err != nil {
return ShipperConfig{}, fmt.Errorf("error parsing string config: %w", err)
return ShipperRootConfig{}, fmt.Errorf("error reading in raw map config: %w", err)
}

unpacker := func(cfg *ShipperConfig) error {
return rawCfg.Unpack(cfg)
// We should merge config overwrites here from the -E flag,
// but the 'path.*' variables set by elastic-agent conflict with the `path` flag in the elasticsearch settings
// see https://github.com/elastic/elastic-agent/issues/1729
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

// output config is at the "root" level, so we need to unpack those manually
err = cfg.Unpack(&cfgObject)
if err != nil {
return ShipperRootConfig{}, fmt.Errorf("error unpacking shipper config: %w", err)
}
switch cfgObject.Type {
case esKey:
err = cfg.Unpack(&cfgObject.Shipper.Output.Elasticsearch)
if err != nil {
return ShipperRootConfig{}, fmt.Errorf("error reading elasticsearch output: %w", err)
}
case kafaKey:
err = cfg.Unpack(&cfgObject.Shipper.Output.Kafka)
if err != nil {
return ShipperRootConfig{}, fmt.Errorf("error reading elasticsearch output: %w", err)
}
case consoleKey:
err = cfg.Unpack(&cfgObject.Shipper.Output.Console)
if err != nil {
return ShipperRootConfig{}, fmt.Errorf("error reading console output: %w", err)
}
default:
return ShipperRootConfig{}, fmt.Errorf("error, could not find output for output key '%s'", cfgObject.Type)
}

return readConfig(unpacker)
return cfgObject, nil
}

type rawUnpacker func(cfg *ShipperConfig) error
type rawUnpacker func(cfg *ShipperRootConfig) error

func readConfig(unpacker rawUnpacker) (config ShipperConfig, err error) {
func readConfig(unpacker rawUnpacker) (config ShipperRootConfig, err error) {
// systemd environment will send us to stdout environment, which we want
config = ShipperConfig{
Log: logp.DefaultConfig(logp.SystemdEnvironment),
Monitor: monitoring.DefaultConfig(),
Queue: queue.DefaultConfig(),
Server: server.DefaultConfig(),
Output: output.DefaultConfig(),
}

config = DefaultConfig()
err = unpacker(&config)
if err != nil {
return config, fmt.Errorf("error unpacking shipper config: %w", err)
}

// otherwise the logging configuration is just ignored
err = logp.Configure(config.Log)
if err != nil {
return config, fmt.Errorf("error configuring the logger: %w", err)
}

return config, nil
}

// ZapFromUnitLogLevel converts the log level used by the units API, to the one used by the logger
func ZapFromUnitLogLevel(level client.UnitLogLevel) zapcore.Level {
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
newLevel := zapcore.InfoLevel
switch level {
case client.UnitLogLevelInfo:
newLevel = zapcore.InfoLevel
case client.UnitLogLevelDebug:
newLevel = zapcore.DebugLevel
case client.UnitLogLevelWarn:
newLevel = zapcore.WarnLevel
case client.UnitLogLevelError:
newLevel = zapcore.ErrorLevel
case client.UnitLogLevelTrace:
// zapcore has no trace level
newLevel = zapcore.DebugLevel
}
return newLevel
}
49 changes: 49 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package config

import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/logp"
)

func TestConfig(t *testing.T) {

testSource, err := structpb.NewStruct(map[string]interface{}{
"type": "elasticsearch",
"enabled": true,
"hosts": []interface{}{"localhost:9200"},
"username": "elastic",
"password": "changeme",
})
require.NoError(t, err)

testConfig := &proto.UnitExpectedConfig{
Source: testSource,
}
// check the log level as well
logp.SetLevel(zapcore.InfoLevel)

shipperCfg, err := ShipperConfigFromUnitConfig(client.UnitLogLevelDebug, testConfig)
require.NoError(t, err)

require.Equal(t, logp.GetLevel(), zapcore.DebugLevel)

require.NotNil(t, shipperCfg.Shipper.Output.Elasticsearch)
require.Equal(t, "elastic", shipperCfg.Shipper.Output.Elasticsearch.Username)
require.Equal(t, "changeme", shipperCfg.Shipper.Output.Elasticsearch.Password)
require.True(t, shipperCfg.Shipper.Output.Elasticsearch.Enabled)

require.Nil(t, shipperCfg.Shipper.Output.Console)
require.Nil(t, shipperCfg.Shipper.Output.Kafka)
require.Nil(t, shipperCfg.Shipper.Output.Logstash)
}
Loading