From 2297636b9f159526a77a29561df1a97d7bb93a18 Mon Sep 17 00:00:00 2001
From: Blake Rouse <blake.rouse@elastic.co>
Date: Tue, 14 Jul 2020 10:54:01 -0400
Subject: [PATCH] [Elastic Agent] Send Agent logs to elasticsearch (#19811)

* Work on logging twice.

* Work on agent logging to fleet.

* Commit example index strategy.

* More work on logging to ES.

* Revert change to release/version.go

* Fix indexes for metricbeat sidecars.

* Add to changelog.

* Fix fmt.

* Don't expose zapLevel, add ConfigureWithOutputs.

* Update comment.

* Update comment.
---
 libbeat/logp/configure/logging.go             |  17 +++
 libbeat/logp/core.go                          |  69 +++++++++
 libbeat/logp/encoding.go                      |  14 +-
 x-pack/elastic-agent/CHANGELOG.asciidoc       |   1 +
 .../pkg/agent/application/paths/paths.go      |  10 +-
 x-pack/elastic-agent/pkg/agent/cmd/common.go  |   1 +
 .../pkg/agent/operation/monitoring.go         | 143 +++++++++++-------
 .../elastic-agent/pkg/core/logger/logger.go   |  42 ++++-
 .../core/monitoring/beats/beats_monitor.go    |   4 +
 .../pkg/core/monitoring/beats/monitoring.go   |   4 +-
 10 files changed, 230 insertions(+), 75 deletions(-)

diff --git a/libbeat/logp/configure/logging.go b/libbeat/logp/configure/logging.go
index 6e4d60ece1f3..ceb295eab800 100644
--- a/libbeat/logp/configure/logging.go
+++ b/libbeat/logp/configure/logging.go
@@ -22,6 +22,8 @@ import (
 	"fmt"
 	"strings"
 
+	"go.uber.org/zap/zapcore"
+
 	"github.com/elastic/beats/v7/libbeat/common"
 	"github.com/elastic/beats/v7/libbeat/logp"
 )
@@ -58,6 +60,21 @@ func Logging(beatName string, cfg *common.Config) error {
 	return logp.Configure(config)
 }
 
+// Logging builds a logp.Config based on the given common.Config and the specified
+// CLI flags along with the given outputs.
+func LoggingWithOutputs(beatName string, cfg *common.Config, outputs ...zapcore.Core) error {
+	config := logp.DefaultConfig(environment)
+	config.Beat = beatName
+	if cfg != nil {
+		if err := cfg.Unpack(&config); err != nil {
+			return err
+		}
+	}
+
+	applyFlags(&config)
+	return logp.ConfigureWithOutputs(config, outputs...)
+}
+
 func applyFlags(cfg *logp.Config) {
 	if toStderr {
 		cfg.ToStderr = true
diff --git a/libbeat/logp/core.go b/libbeat/logp/core.go
index e5a4f94e8ee8..afb4f57378dc 100644
--- a/libbeat/logp/core.go
+++ b/libbeat/logp/core.go
@@ -27,6 +27,8 @@ import (
 	"sync/atomic"
 	"unsafe"
 
+	"github.com/hashicorp/go-multierror"
+
 	"github.com/pkg/errors"
 	"go.uber.org/zap"
 	"go.uber.org/zap/zapcore"
@@ -62,6 +64,13 @@ type coreLogger struct {
 
 // Configure configures the logp package.
 func Configure(cfg Config) error {
+	return ConfigureWithOutputs(cfg)
+}
+
+// XXX: ConfigureWithOutputs is used by elastic-agent only (See file: x-pack/elastic-agent/pkg/core/logger/logger.go).
+// The agent requires that the output specified in the config object is configured and merged with the
+// logging outputs given.
+func ConfigureWithOutputs(cfg Config, outputs ...zapcore.Core) error {
 	var (
 		sink         zapcore.Core
 		observedLogs *observer.ObservedLogs
@@ -105,6 +114,7 @@ func Configure(cfg Config) error {
 		sink = selectiveWrapper(sink, selectors)
 	}
 
+	sink = newMultiCore(append(outputs, sink)...)
 	root := zap.New(sink, makeOptions(cfg)...)
 	storeLogger(&coreLogger{
 		selectors:    selectors,
@@ -262,3 +272,62 @@ func storeLogger(l *coreLogger) {
 	}
 	atomic.StorePointer(&_log, unsafe.Pointer(l))
 }
+
+// newMultiCore creates a sink that sends to multiple cores.
+func newMultiCore(cores ...zapcore.Core) zapcore.Core {
+	return &multiCore{cores}
+}
+
+// multiCore allows multiple cores to be used for logging.
+type multiCore struct {
+	cores []zapcore.Core
+}
+
+// Enabled returns true if the level is enabled in any one of the cores.
+func (m multiCore) Enabled(level zapcore.Level) bool {
+	for _, core := range m.cores {
+		if core.Enabled(level) {
+			return true
+		}
+	}
+	return false
+}
+
+// With creates a new multiCore with each core set with the given fields.
+func (m multiCore) With(fields []zapcore.Field) zapcore.Core {
+	cores := make([]zapcore.Core, len(m.cores))
+	for i, core := range m.cores {
+		cores[i] = core.With(fields)
+	}
+	return &multiCore{cores}
+}
+
+// Check will place each core that checks for that entry.
+func (m multiCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
+	for _, core := range m.cores {
+		checked = core.Check(entry, checked)
+	}
+	return checked
+}
+
+// Write writes the entry to each core.
+func (m multiCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
+	var errs error
+	for _, core := range m.cores {
+		if err := core.Write(entry, fields); err != nil {
+			errs = multierror.Append(errs, err)
+		}
+	}
+	return errs
+}
+
+// Sync syncs each core.
+func (m multiCore) Sync() error {
+	var errs error
+	for _, core := range m.cores {
+		if err := core.Sync(); err != nil {
+			errs = multierror.Append(errs, err)
+		}
+	}
+	return errs
+}
diff --git a/libbeat/logp/encoding.go b/libbeat/logp/encoding.go
index 7c3e56507c07..b1977285602c 100644
--- a/libbeat/logp/encoding.go
+++ b/libbeat/logp/encoding.go
@@ -44,13 +44,13 @@ func buildEncoder(cfg Config) zapcore.Encoder {
 	var encCfg zapcore.EncoderConfig
 	var encCreator encoderCreator
 	if cfg.JSON {
-		encCfg = jsonEncoderConfig()
+		encCfg = JSONEncoderConfig()
 		encCreator = zapcore.NewJSONEncoder
 	} else if cfg.ToSyslog {
-		encCfg = syslogEncoderConfig()
+		encCfg = SyslogEncoderConfig()
 		encCreator = zapcore.NewConsoleEncoder
 	} else {
-		encCfg = consoleEncoderConfig()
+		encCfg = ConsoleEncoderConfig()
 		encCreator = zapcore.NewConsoleEncoder
 	}
 
@@ -60,19 +60,19 @@ func buildEncoder(cfg Config) zapcore.Encoder {
 	return encCreator(encCfg)
 }
 
-func jsonEncoderConfig() zapcore.EncoderConfig {
+func JSONEncoderConfig() zapcore.EncoderConfig {
 	return baseEncodingConfig
 }
 
-func consoleEncoderConfig() zapcore.EncoderConfig {
+func ConsoleEncoderConfig() zapcore.EncoderConfig {
 	c := baseEncodingConfig
 	c.EncodeLevel = zapcore.CapitalLevelEncoder
 	c.EncodeName = bracketedNameEncoder
 	return c
 }
 
-func syslogEncoderConfig() zapcore.EncoderConfig {
-	c := consoleEncoderConfig()
+func SyslogEncoderConfig() zapcore.EncoderConfig {
+	c := ConsoleEncoderConfig()
 	// Time is generally added by syslog.
 	// But when logging with ECS the empty TimeKey will be
 	// ignored and @timestamp is still added to log line
diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc
index 855041d47b36..b2a2cd41ee83 100644
--- a/x-pack/elastic-agent/CHANGELOG.asciidoc
+++ b/x-pack/elastic-agent/CHANGELOG.asciidoc
@@ -82,3 +82,4 @@
 - Refuse invalid stream values in configuration {pull}19587[19587]
 - Agent now load balances across multiple Kibana instances {pull}19628[19628]
 - Configuration cleanup {pull}19848[19848]
+- Agent now sends its own logs to elasticsearch {pull}19811[19811]
diff --git a/x-pack/elastic-agent/pkg/agent/application/paths/paths.go b/x-pack/elastic-agent/pkg/agent/application/paths/paths.go
index a45000b40ae5..791c473c03ff 100644
--- a/x-pack/elastic-agent/pkg/agent/application/paths/paths.go
+++ b/x-pack/elastic-agent/pkg/agent/application/paths/paths.go
@@ -13,6 +13,7 @@ import (
 var (
 	homePath string
 	dataPath string
+	logsPath string
 )
 
 func init() {
@@ -21,6 +22,7 @@ func init() {
 	fs := flag.CommandLine
 	fs.StringVar(&homePath, "path.home", exePath, "Agent root path")
 	fs.StringVar(&dataPath, "path.data", filepath.Join(exePath, "data"), "Data path contains Agent managed binaries")
+	fs.StringVar(&logsPath, "path.logs", exePath, "Logs path contains Agent log output")
 }
 
 // Home returns a directory where binary lives
@@ -29,13 +31,17 @@ func Home() string {
 	return homePath
 }
 
-// Data returns a home directory of current user
+// Data returns the data directory for Agent
 func Data() string {
 	return dataPath
 }
 
-func retrieveExecutablePath() string {
+// Logs returns a the log directory for Agent
+func Logs() string {
+	return logsPath
+}
 
+func retrieveExecutablePath() string {
 	execPath, err := os.Executable()
 	if err != nil {
 		panic(err)
diff --git a/x-pack/elastic-agent/pkg/agent/cmd/common.go b/x-pack/elastic-agent/pkg/agent/cmd/common.go
index 54b51202ef57..080406e8c9e9 100644
--- a/x-pack/elastic-agent/pkg/agent/cmd/common.go
+++ b/x-pack/elastic-agent/pkg/agent/cmd/common.go
@@ -52,6 +52,7 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command {
 
 	cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.home"))
 	cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.data"))
+	cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.logs"))
 
 	cmd.PersistentFlags().StringVarP(&flags.PathConfigFile, "", "c", defaultConfig, fmt.Sprintf(`Configuration file, relative to path.config (default "%s")`, defaultConfig))
 	cmd.PersistentFlags().StringVarP(&flags.PathConfig, "path.config", "", "${path.home}", "Configuration path")
diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go
index 034e47be64af..bf03f4f34a5f 100644
--- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go
+++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go
@@ -6,9 +6,11 @@ package operation
 
 import (
 	"fmt"
+	"path/filepath"
 
 	"github.com/hashicorp/go-multierror"
 
+	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
@@ -16,12 +18,11 @@ import (
 )
 
 const (
-	monitoringName          = "FLEET_MONITORING"
-	outputKey               = "output"
-	monitoringEnabledSubkey = "enabled"
-	logsProcessName         = "filebeat"
-	metricsProcessName      = "metricbeat"
-	artifactPrefix          = "beats"
+	monitoringName     = "FLEET_MONITORING"
+	outputKey          = "output"
+	logsProcessName    = "filebeat"
+	metricsProcessName = "metricbeat"
+	artifactPrefix     = "beats"
 )
 
 func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
@@ -174,37 +175,62 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [
 }
 
 func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]interface{}, bool) {
-	paths := o.getLogFilePaths()
-	if len(paths) == 0 {
-		return nil, false
-	}
-
-	result := map[string]interface{}{
-		"filebeat": map[string]interface{}{
-			"inputs": []interface{}{
-				map[string]interface{}{
-					"type": "log",
-					"multiline": map[string]interface{}{
-						"pattern": "^[0-9]{4}",
-						"negate":  true,
-						"match":   "after",
+	inputs := []interface{}{
+		map[string]interface{}{
+			"type": "log",
+			"json": map[string]interface{}{
+				"keys_under_root": true,
+				"overwrite_keys":  true,
+				"message_key":     "message",
+			},
+			"paths": []string{
+				filepath.Join(paths.Data(), "logs", "elastic-agent-json.log"),
+			},
+			"index": "logs-elastic.agent-default",
+			"processors": []map[string]interface{}{
+				{
+					"add_fields": map[string]interface{}{
+						"target": "dataset",
+						"fields": map[string]interface{}{
+							"type":      "logs",
+							"name":      "elastic.agent",
+							"namespace": "default",
+						},
 					},
-					"paths": paths,
-					"index": "logs-agent-default",
-					"processors": []map[string]interface{}{
-						{
-							"add_fields": map[string]interface{}{
-								"target": "dataset",
-								"fields": map[string]interface{}{
-									"type":      "logs",
-									"name":      "agent",
-									"namespace": "default",
-								},
+				},
+			},
+		},
+	}
+	logPaths := o.getLogFilePaths()
+	if len(logPaths) > 0 {
+		for name, paths := range logPaths {
+			inputs = append(inputs, map[string]interface{}{
+				"type": "log",
+				"json": map[string]interface{}{
+					"keys_under_root": true,
+					"overwrite_keys":  true,
+					"message_key":     "message",
+				},
+				"paths": paths,
+				"index": fmt.Sprintf("logs-elastic.agent.%s-default", name),
+				"processors": []map[string]interface{}{
+					{
+						"add_fields": map[string]interface{}{
+							"target": "dataset",
+							"fields": map[string]interface{}{
+								"type":      "logs",
+								"name":      fmt.Sprintf("elastic.agent.%s", name),
+								"namespace": "default",
 							},
 						},
 					},
 				},
-			},
+			})
+		}
+	}
+	result := map[string]interface{}{
+		"filebeat": map[string]interface{}{
+			"inputs": inputs,
 		},
 		"output": map[string]interface{}{
 			"elasticsearch": output,
@@ -221,30 +247,31 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
 	if len(hosts) == 0 {
 		return nil, false
 	}
-
-	result := map[string]interface{}{
-		"metricbeat": map[string]interface{}{
-			"modules": []interface{}{
-				map[string]interface{}{
-					"module":     "beat",
-					"metricsets": []string{"stats", "state"},
-					"period":     "10s",
-					"hosts":      hosts,
-					"index":      "metrics-agent-default",
-					"processors": []map[string]interface{}{
-						{
-							"add_fields": map[string]interface{}{
-								"target": "dataset",
-								"fields": map[string]interface{}{
-									"type":      "metrics",
-									"name":      "agent",
-									"namespace": "default",
-								},
-							},
+	var modules []interface{}
+	for name, endpoints := range hosts {
+		modules = append(modules, map[string]interface{}{
+			"module":     "beat",
+			"metricsets": []string{"stats", "state"},
+			"period":     "10s",
+			"hosts":      endpoints,
+			"index":      fmt.Sprintf("metrics-elastic.agent.%s-default", name),
+			"processors": []map[string]interface{}{
+				{
+					"add_fields": map[string]interface{}{
+						"target": "dataset",
+						"fields": map[string]interface{}{
+							"type":      "metrics",
+							"name":      fmt.Sprintf("elastic.agent.%s", name),
+							"namespace": "default",
 						},
 					},
 				},
 			},
+		})
+	}
+	result := map[string]interface{}{
+		"metricbeat": map[string]interface{}{
+			"modules": modules,
 		},
 		"output": map[string]interface{}{
 			"elasticsearch": output,
@@ -256,8 +283,8 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
 	return result, true
 }
 
-func (o *Operator) getLogFilePaths() []string {
-	var paths []string
+func (o *Operator) getLogFilePaths() map[string][]string {
+	paths := map[string][]string{}
 
 	o.appsLock.Lock()
 	defer o.appsLock.Unlock()
@@ -265,15 +292,15 @@ func (o *Operator) getLogFilePaths() []string {
 	for _, a := range o.apps {
 		logPath := a.Monitor().LogPath(a.Name(), o.pipelineID)
 		if logPath != "" {
-			paths = append(paths, logPath)
+			paths[a.Name()] = append(paths[a.Name()], logPath)
 		}
 	}
 
 	return paths
 }
 
-func (o *Operator) getMetricbeatEndpoints() []string {
-	var endpoints []string
+func (o *Operator) getMetricbeatEndpoints() map[string][]string {
+	endpoints := map[string][]string{}
 
 	o.appsLock.Lock()
 	defer o.appsLock.Unlock()
@@ -281,7 +308,7 @@ func (o *Operator) getMetricbeatEndpoints() []string {
 	for _, a := range o.apps {
 		metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Name(), o.pipelineID)
 		if metricEndpoint != "" {
-			endpoints = append(endpoints, metricEndpoint)
+			endpoints[a.Name()] = append(endpoints[a.Name()], metricEndpoint)
 		}
 	}
 
diff --git a/x-pack/elastic-agent/pkg/core/logger/logger.go b/x-pack/elastic-agent/pkg/core/logger/logger.go
index f7559bedd3b4..0a73f36f08fd 100644
--- a/x-pack/elastic-agent/pkg/core/logger/logger.go
+++ b/x-pack/elastic-agent/pkg/core/logger/logger.go
@@ -6,11 +6,15 @@ package logger
 
 import (
 	"fmt"
+	"os"
 	"path/filepath"
 
+	"go.elastic.co/ecszap"
+	"go.uber.org/zap/zapcore"
 	"gopkg.in/yaml.v2"
 
 	"github.com/elastic/beats/v7/libbeat/common"
+	"github.com/elastic/beats/v7/libbeat/common/file"
 	"github.com/elastic/beats/v7/libbeat/logp"
 	"github.com/elastic/beats/v7/libbeat/logp/configure"
 	"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
@@ -50,11 +54,13 @@ func new(name string, cfg *Config) (*Logger, error) {
 	if err != nil {
 		return nil, err
 	}
-
-	if err := configure.Logging("", commonCfg); err != nil {
+	internal, err := makeInternalFileOutput()
+	if err != nil {
+		return nil, err
+	}
+	if err := configure.LoggingWithOutputs("", commonCfg, internal); err != nil {
 		return nil, fmt.Errorf("error initializing logging: %v", err)
 	}
-
 	return logp.NewLogger(name), nil
 }
 
@@ -80,10 +86,34 @@ func toCommonConfig(cfg *Config) (*common.Config, error) {
 func DefaultLoggingConfig() *Config {
 	cfg := logp.DefaultConfig(logp.DefaultEnvironment)
 	cfg.Beat = agentName
-	cfg.ECSEnabled = true
 	cfg.Level = logp.DebugLevel
-	cfg.Files.Path = filepath.Join(paths.Home(), "data", "logs")
-	cfg.Files.Name = agentName
+	cfg.Files.Path = paths.Logs()
+	cfg.Files.Name = fmt.Sprintf("%s.log", agentName)
 
 	return &cfg
 }
+
+// makeInternalFileOutput creates a zapcore.Core logger that cannot be changed with configuration.
+//
+// This is the logger that the spawned filebeat expects to read the log file from and ship to ES.
+func makeInternalFileOutput() (zapcore.Core, error) {
+	// defaultCfg is used to set the defaults for the file rotation of the internal logging
+	// these settings cannot be changed by a user configuration
+	defaultCfg := logp.DefaultConfig(logp.DefaultEnvironment)
+	filename := filepath.Join(paths.Data(), "logs", fmt.Sprintf("%s-json.log", agentName))
+
+	rotator, err := file.NewFileRotator(filename,
+		file.MaxSizeBytes(defaultCfg.Files.MaxSize),
+		file.MaxBackups(defaultCfg.Files.MaxBackups),
+		file.Permissions(os.FileMode(defaultCfg.Files.Permissions)),
+		file.Interval(defaultCfg.Files.Interval),
+		file.RotateOnStartup(defaultCfg.Files.RotateOnStartup),
+		file.RedirectStderr(defaultCfg.Files.RedirectStderr),
+	)
+	if err != nil {
+		return nil, errors.New("failed to create internal file rotator")
+	}
+
+	encoder := zapcore.NewJSONEncoder(ecszap.ECSCompatibleEncoderConfig(logp.JSONEncoderConfig()))
+	return ecszap.WrapCore(zapcore.NewCore(encoder, rotator, zapcore.DebugLevel)), nil
+}
diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go
index e1c6d2b153df..53b9f377fcdb 100644
--- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go
+++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go
@@ -5,6 +5,7 @@
 package beats
 
 import (
+	"fmt"
 	"net/url"
 	"os"
 	"path/filepath"
@@ -109,7 +110,10 @@ func (b *Monitor) EnrichArgs(process, pipelineID string, args []string, isSideca
 		if isSidecar {
 			logFile += "_monitor"
 		}
+		logFile = fmt.Sprintf("%s-json.log", logFile)
 		appendix = append(appendix,
+			"-E", "logging.json=true",
+			"-E", "logging.ecs=true",
 			"-E", "logging.files.path="+loggingPath,
 			"-E", "logging.files.name="+logFile,
 			"-E", "logging.files.keepfiles=7",
diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go
index 265ea4cda823..d38f7d5843cb 100644
--- a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go
+++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go
@@ -12,9 +12,9 @@ import (
 
 const (
 	// args: data path, pipeline name, application name
-	logFileFormat = "%s/logs/%s/%s"
+	logFileFormat = "%s/logs/%s/%s-json.log"
 	// args: data path, install path, pipeline name, application name
-	logFileFormatWin = "%s\\logs\\%s\\%s"
+	logFileFormatWin = "%s\\logs\\%s\\%s-json.log"
 
 	// args: pipeline name, application name
 	mbEndpointFileFormat = "unix:///tmp/elastic-agent/%s/%s/%s.sock"