From 2d5a12bc10a94f9c7f09d80ff0efc920421eaf4b Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 8 Feb 2019 09:25:40 -0800 Subject: [PATCH 01/59] WIP: Debugging docker-compose failure --- libbeat/scripts/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/scripts/Makefile b/libbeat/scripts/Makefile index a007eca7b29..8e34eb4c820 100755 --- a/libbeat/scripts/Makefile +++ b/libbeat/scripts/Makefile @@ -208,7 +208,7 @@ integration-tests-environment: prepare-tests build-image # # This will make docker-compose command to display the logs on stdout on error, It's not enabled # by default because it can create noise if the test inside the container fails. - ${DOCKER_COMPOSE} run beat make integration-tests RACE_DETECTOR=$(RACE_DETECTOR) DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} + ${DOCKER_COMPOSE} run beat make integration-tests RACE_DETECTOR=$(RACE_DETECTOR) DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} || ${DOCKER_COMPOSE} logs --tail 200 # Runs the system tests .PHONY: system-tests From 29e1e04192e97044f9b1f93badc7290f707a7fa1 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 8 Feb 2019 09:54:55 -0800 Subject: [PATCH 02/59] [WIP] Add docker compose logging for system tests as well --- libbeat/scripts/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/scripts/Makefile b/libbeat/scripts/Makefile index 8e34eb4c820..b019462b70e 100755 --- a/libbeat/scripts/Makefile +++ b/libbeat/scripts/Makefile @@ -221,7 +221,7 @@ system-tests: prepare-tests ${BEAT_NAME}.test python-env .PHONY: system-tests-environment system-tests-environment: ## @testing Runs the system tests inside a virtual environment. This can be run on any docker-machine (local, remote) system-tests-environment: prepare-tests build-image - ${DOCKER_COMPOSE} run -e INTEGRATION_TESTS=1 -e TESTING_ENVIRONMENT=${TESTING_ENVIRONMENT} -e DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} beat make system-tests + ${DOCKER_COMPOSE} run -e INTEGRATION_TESTS=1 -e TESTING_ENVIRONMENT=${TESTING_ENVIRONMENT} -e DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} beat make system-tests || ${DOCKER_COMPOSE} logs --tail 200 .PHONY: fast-system-tests fast-system-tests: ## @testing Runs system tests without coverage reports and in parallel From fe07647af12251fb318f9080425df1475f397ae5 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 8 Feb 2019 19:40:05 -0800 Subject: [PATCH 03/59] Removing logging statements used for debugging --- libbeat/scripts/Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/scripts/Makefile b/libbeat/scripts/Makefile index b019462b70e..a007eca7b29 100755 --- a/libbeat/scripts/Makefile +++ b/libbeat/scripts/Makefile @@ -208,7 +208,7 @@ integration-tests-environment: prepare-tests build-image # # This will make docker-compose command to display the logs on stdout on error, It's not enabled # by default because it can create noise if the test inside the container fails. - ${DOCKER_COMPOSE} run beat make integration-tests RACE_DETECTOR=$(RACE_DETECTOR) DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} || ${DOCKER_COMPOSE} logs --tail 200 + ${DOCKER_COMPOSE} run beat make integration-tests RACE_DETECTOR=$(RACE_DETECTOR) DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} # Runs the system tests .PHONY: system-tests @@ -221,7 +221,7 @@ system-tests: prepare-tests ${BEAT_NAME}.test python-env .PHONY: system-tests-environment system-tests-environment: ## @testing Runs the system tests inside a virtual environment. This can be run on any docker-machine (local, remote) system-tests-environment: prepare-tests build-image - ${DOCKER_COMPOSE} run -e INTEGRATION_TESTS=1 -e TESTING_ENVIRONMENT=${TESTING_ENVIRONMENT} -e DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} beat make system-tests || ${DOCKER_COMPOSE} logs --tail 200 + ${DOCKER_COMPOSE} run -e INTEGRATION_TESTS=1 -e TESTING_ENVIRONMENT=${TESTING_ENVIRONMENT} -e DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} beat make system-tests .PHONY: fast-system-tests fast-system-tests: ## @testing Runs system tests without coverage reports and in parallel From 93ff6fd5af99db6bc39e308d0030cc33e090a4ce Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 22 Nov 2018 01:12:15 -0800 Subject: [PATCH 04/59] Fixing imports --- libbeat/cmd/instance/beat.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index f76f2047cf1..5b6bf514d2c 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -39,6 +39,10 @@ import ( errw "github.com/pkg/errors" "go.uber.org/zap" + sysinfo "github.com/elastic/go-sysinfo" + "github.com/elastic/go-sysinfo/types" + ucfg "github.com/elastic/go-ucfg" + "github.com/elastic/beats/libbeat/api" "github.com/elastic/beats/libbeat/asset" "github.com/elastic/beats/libbeat/beat" @@ -66,9 +70,6 @@ import ( "github.com/elastic/beats/libbeat/publisher/processing" svc "github.com/elastic/beats/libbeat/service" "github.com/elastic/beats/libbeat/version" - sysinfo "github.com/elastic/go-sysinfo" - "github.com/elastic/go-sysinfo/types" - ucfg "github.com/elastic/go-ucfg" ) // Beat provides the runnable and configurable instance of a beat. From 8a81dcd540fe73934888d5bbf77ccceb02e9179d Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 22 Nov 2018 01:12:26 -0800 Subject: [PATCH 05/59] Introducing monitoring.* config --- libbeat/cmd/instance/beat.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 5b6bf514d2c..5d60370ed41 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -49,6 +49,7 @@ import ( "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/cloudid" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/file" "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/common/seccomp" @@ -103,8 +104,9 @@ type beatConfig struct { Keystore *common.Config `config:"keystore"` // output/publishing related configurations - Pipeline pipeline.Config `config:",inline"` - Monitoring *common.Config `config:"xpack.monitoring"` + Pipeline pipeline.Config `config:",inline"` + Monitoring *common.Config `config:"xpack.monitoring"` + MonitoringNew *common.Config `config:"monitoring"` // central management settings Management *common.Config `config:"management"` @@ -362,11 +364,13 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { return err } - if b.Config.Monitoring.Enabled() { + if b.Config.MonitoringNew.Enabled() || b.Config.Monitoring.Enabled() { + monitoringCfg := chooseMonitoringConfig(b.Config) + settings := report.Settings{ DefaultUsername: settings.Monitoring.DefaultUsername, } - reporter, err := report.New(b.Info, settings, b.Config.Monitoring, b.Config.Output) + reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output) if err != nil { return err } @@ -946,3 +950,19 @@ func initPaths(cfg *common.Config) error { } return nil } + +func chooseMonitoringConfig(beatCfg beatConfig) (monitoringCfg *common.Config) { + if beatCfg.MonitoringNew.Enabled() && beatCfg.Monitoring.Enabled() { + cfgwarn.Deprecate("6.6.0", "both xpack.monitoring.* and monitoring.* cannot be set. Using monitoring.* which is preferred.") + monitoringCfg = beatCfg.MonitoringNew + monitoringCfg.SetString("dest", -1, "monitoring") + } else if beatCfg.Monitoring.Enabled() { + cfgwarn.Deprecate("6.6.0", "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.hosts to monitoring cluster hosts.") + monitoringCfg = beatCfg.Monitoring + monitoringCfg.SetString("dest", -1, "production") + } else { + monitoringCfg = beatCfg.MonitoringNew + monitoringCfg.SetString("dest", -1, "monitoring") + } + return +} From 215d9a351f0df0e7a8fb9460d16330bd6a35e8ba Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 22 Nov 2018 10:36:30 -0800 Subject: [PATCH 06/59] Determine reporter format (production or monitoring) --- libbeat/cmd/instance/beat.go | 17 +++++++++-------- .../monitoring/report/elasticsearch/client.go | 2 ++ libbeat/monitoring/report/report.go | 10 ++++++++-- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 5d60370ed41..7c19e958956 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -365,7 +365,10 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { } if b.Config.MonitoringNew.Enabled() || b.Config.Monitoring.Enabled() { - monitoringCfg := chooseMonitoringConfig(b.Config) + monitoringCfg, err := chooseMonitoringConfig(b.Config) + if err != nil { + return err + } settings := report.Settings{ DefaultUsername: settings.Monitoring.DefaultUsername, @@ -951,18 +954,16 @@ func initPaths(cfg *common.Config) error { return nil } -func chooseMonitoringConfig(beatCfg beatConfig) (monitoringCfg *common.Config) { +func chooseMonitoringConfig(beatCfg beatConfig) (monitoringCfg *common.Config, err error) { if beatCfg.MonitoringNew.Enabled() && beatCfg.Monitoring.Enabled() { - cfgwarn.Deprecate("6.6.0", "both xpack.monitoring.* and monitoring.* cannot be set. Using monitoring.* which is preferred.") - monitoringCfg = beatCfg.MonitoringNew - monitoringCfg.SetString("dest", -1, "monitoring") + err = fmt.Errorf("both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.hosts to monitoring cluster hosts") } else if beatCfg.Monitoring.Enabled() { - cfgwarn.Deprecate("6.6.0", "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.hosts to monitoring cluster hosts.") + cfgwarn.Deprecate("7.0", "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.hosts to monitoring cluster hosts") monitoringCfg = beatCfg.Monitoring - monitoringCfg.SetString("dest", -1, "production") + monitoringCfg.SetString("format", -1, "production") } else { monitoringCfg = beatCfg.MonitoringNew - monitoringCfg.SetString("dest", -1, "monitoring") + monitoringCfg.SetString("format", -1, "monitoring") } return } diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 63be94cf324..92c9907fdc8 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -84,6 +84,8 @@ func (c *publishClient) Connect() error { } debugf("XPack monitoring is enabled") + debugf("Publishing to %v cluster", c.params["format"]) + return nil } diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index 8e180de6aad..79d29d04e39 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -73,10 +73,10 @@ func New( } func getReporterConfig( - cfg *common.Config, + monitoringConfig *common.Config, outputs common.ConfigNamespace, ) (string, *common.Config, error) { - cfg = collectSubObject(cfg) + cfg := collectSubObject(monitoringConfig) config := defaultConfig if err := cfg.Unpack(&config); err != nil { return "", nil, err @@ -110,6 +110,12 @@ func getReporterConfig( rc = merged } + format, err := monitoringConfig.String("format", -1) + if err != nil { + return "", nil, err + } + rc.SetString("parameters.format", -1, format) + return name, rc, nil } From 1e206a7b124d8542b9cd62465244abf82c913e65 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 27 Nov 2018 19:13:27 -0800 Subject: [PATCH 07/59] WIP: Capture elasticsearch cluster_uuid on connect to ES output? --- libbeat/cmd/instance/beat.go | 29 +++++++++++++++++-- libbeat/outputs/elasticsearch/client.go | 2 +- .../outputs/elasticsearch/elasticsearch.go | 6 ++-- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 7c19e958956..a52a778bdee 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -767,8 +767,7 @@ func (b *Beat) registerESIndexManagement() error { return nil } -// Build and return a callback to load index template into ES -func (b *Beat) indexSetupCallback() func(esClient *elasticsearch.Client) error { +func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback { return func(esClient *elasticsearch.Client) error { m := b.index.Manager(esClient, idxmgmt.BeatsAssets(b.Fields)) return m.Setup(true, true) @@ -798,6 +797,32 @@ func (b *Beat) createOutput(stats outputs.Observer, cfg common.ConfigNamespace) return outputs.Load(b.index, b.Info, stats, cfg.Name(), cfg.Config()) } +// Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring +func (b *Beat) clusterUUIDFetchingCalling() (elasticsearch.ConnectCallback, error) { + var response struct { + ClusterUUID string `json:"cluster_uuid"` + } + + callback := func(esClient *elasticsearch.Client) error { + status, body, err := esClient.Request("GET", "/", "", nil, nil) + if err != nil { + return errw.Wrap(err, "error querying /") + } + if status > 299 { + return fmt.Errorf("Error querying /. Status: %d. Response body: %s", status, body) + } + err = json.Unmarshal(body, &response) + if err != nil { + return fmt.Errorf("Error unmarshaling json when querying /. Body: %s", body) + } + + // TODO: "Save" cluster_uuid somewhere? State namespace? New namespace? + return nil + } + + return callback, nil +} + // handleError handles the given error by logging it and then returning the // error. If the err is nil or is a GracefulExit error then the method will // return nil without logging anything. diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 42f9e69939b..be23512dea5 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -77,7 +77,7 @@ type ClientSettings struct { Observer outputs.Observer } -type connectCallback func(client *Client) error +type ConnectCallback func(client *Client) error // Connection manages the connection for a given client. type Connection struct { diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 323f4c88a35..7d20e93cca3 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -54,7 +54,7 @@ var ( // Callbacks must not depend on the result of a previous one, // because the ordering is not fixed. type callbacksRegistry struct { - callbacks map[uuid.UUID]connectCallback + callbacks map[uuid.UUID]ConnectCallback mutex sync.Mutex } @@ -88,14 +88,14 @@ func RegisterGlobalCallback(callback connectCallback) (uuid.UUID, error) { func newCallbacksRegistry() callbacksRegistry { return callbacksRegistry{ - callbacks: make(map[uuid.UUID]connectCallback), + callbacks: make(map[uuid.UUID]ConnectCallback), } } // RegisterConnectCallback registers a callback for the elasticsearch output // The callback is called each time the client connects to elasticsearch. // It returns the key of the newly added callback, so it can be deregistered later. -func RegisterConnectCallback(callback connectCallback) (uuid.UUID, error) { +func RegisterConnectCallback(callback ConnectCallback) (uuid.UUID, error) { connectCallbackRegistry.mutex.Lock() defer connectCallbackRegistry.mutex.Unlock() From da4c13db089ae1b84e97cffc453d8b8793dc5393 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 27 Nov 2018 20:55:59 -0800 Subject: [PATCH 08/59] WIP: record cluster_uuid and then try to use it --- libbeat/cmd/instance/beat.go | 6 ++- .../report/elasticsearch/elasticsearch.go | 41 +++++++++++++++---- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index a52a778bdee..53784183025 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -816,7 +816,11 @@ func (b *Beat) clusterUUIDFetchingCalling() (elasticsearch.ConnectCallback, erro return fmt.Errorf("Error unmarshaling json when querying /. Body: %s", body) } - // TODO: "Save" cluster_uuid somewhere? State namespace? New namespace? + stateRegistry := monitoring.GetNamespace("state").GetRegistry() + outputsRegistry := stateRegistry.NewRegistry("outputs") + elasticsearchRegistry := outputsRegistry.NewRegistry("elasticsearch") + monitoring.NewString(elasticsearchRegistry, "cluster_uuid").Set(response.ClusterUUID) + return nil } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 59d4406a7bf..175e8c4e51c 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -255,12 +255,12 @@ func (r *reporter) initLoop(c config) { log.Info("Successfully connected to X-Pack Monitoring endpoint.") // Start collector and send loop if monitoring endpoint has been found. - go r.snapshotLoop("state", "state", c.StatePeriod) + go r.snapshotLoop("state", "state", getClusterUUID(), c.StatePeriod) // For backward compatibility stats is named to metrics. - go r.snapshotLoop("stats", "metrics", c.MetricsPeriod) + go r.snapshotLoop("stats", "metrics", getClusterUUID(), c.MetricsPeriod) } -func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration) { +func (r *reporter) snapshotLoop(namespace, prefix, clusterUUID string, period time.Duration) { ticker := time.NewTicker(period) defer ticker.Stop() @@ -291,15 +291,22 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration) if len(r.tags) > 0 { fields["tags"] = r.tags } + + meta := common.MapStr{ + "type": "beats_" + namespace, + "interval_ms": int64(period / time.Millisecond), + // Converting to seconds as interval only accepts `s` as unit + "params": map[string]string{"interval": strconv.Itoa(int(period/time.Second)) + "s"}, + } + + if clusterUUID != "" { + meta.Put("cluster_uuid", clusterUUID) + } + r.client.Publish(beat.Event{ Timestamp: ts, Fields: fields, - Meta: common.MapStr{ - "type": "beats_" + namespace, - "interval_ms": int64(period / time.Millisecond), - // Converting to seconds as interval only accepts `s` as unit - "params": map[string]string{"interval": strconv.Itoa(int(period/time.Second)) + "s"}, - }, + Meta: meta, }) } } @@ -367,3 +374,19 @@ func makeMeta(beat beat.Info) common.MapStr { "uuid": beat.ID, } } + +func getClusterUUID() string { + stateRegistry := monitoring.GetNamespace("state").GetRegistry() + outputsRegistry := stateRegistry.GetRegistry("outputs") + if outputsRegistry == nil { + return "" + } + + elasticsearchRegistry := outputsRegistry.GetRegistry("elasticearch") + if elasticsearchRegistry == nil { + return "" + } + + // FIXME: this isn't going to work and probably needs to be accessed via a visitor + return monitoring.NewString(elasticsearchRegistry, "cluster_uuid").Get() +} From 7a696e56909f3be727ab4e22621c6068792f64f6 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 27 Nov 2018 21:45:06 -0800 Subject: [PATCH 09/59] Extracting bulk to production code into function --- .../monitoring/report/elasticsearch/client.go | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 92c9907fdc8..7653ba2715e 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -122,23 +122,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } } - meta := common.MapStr{ - "_index": "", - "_routing": nil, - "_type": t, - } - bulk := [2]interface{}{ - common.MapStr{"index": meta}, - report.Event{ - Timestamp: event.Content.Timestamp, - Fields: event.Content.Fields, - }, - } - - // Currently one request per event is sent. Reason is that each event can contain different - // interval params and X-Pack requires to send the interval param. - _, err = c.es.SendMonitoringBulk(params, bulk[:]) - + err = c.bulkToProduction(params, event, t) if err != nil { failed = append(failed, event) reason = err @@ -160,3 +144,23 @@ func (c *publishClient) Test(d testing.Driver) { func (c *publishClient) String() string { return "publish(" + c.es.String() + ")" } + +func (c *publishClient) bulkToProduction(params map[string]string, event publisher.Event, docType interface{}) error { + meta := common.MapStr{ + "_index": "", + "_routing": nil, + "_type": t, + } + bulk := [2]interface{}{ + common.MapStr{"index": meta}, + report.Event{ + Timestamp: event.Content.Timestamp, + Fields: event.Content.Fields, + } + } + + // Currently one request per event is sent. Reason is that each event can contain different + // interval params and X-Pack requires to send the interval param. + _, err = c.es.SendMonitoringBulk(params, bulk[:]) + return err +} From 6d0b0e6f017f8b538e9c12bdf71f98fbb15ef80e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 27 Nov 2018 22:22:49 -0800 Subject: [PATCH 10/59] Add switch for sending to production or monitoring cluster --- .../monitoring/report/elasticsearch/client.go | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 7653ba2715e..b5095cda159 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -122,7 +122,15 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } } - err = c.bulkToProduction(params, event, t) + switch params["format"] { + case "production": + err = c.bulkToProduction(params, event, t) + case "monitoring": + err = c.bulkToMonitoring(event) + default: + err = fmt.Errorf("unsupported reporting format: %v", params["format"]) + } + if err != nil { failed = append(failed, event) reason = err @@ -164,3 +172,25 @@ func (c *publishClient) bulkToProduction(params map[string]string, event publish _, err = c.es.SendMonitoringBulk(params, bulk[:]) return err } + +// TODO: figure out why this isn't actually indexing anything! +func (c *publishClient) bulkToMonitoring(event publisher.Event) error { + action := common.MapStr{ + "index": common.MapStr{ + "_type": "doc", + "_index": "monitoring-beats-6-1", // FIXME + "_routing": nil, + }, + } + document := report.Event{ + Timestamp: event.Content.Timestamp, + Fields: event.Content.Fields, + } + bulk := [2]interface{}{action, document} + + // Currently one request per event is sent. Reason is that each event can contain different + // interval params and X-Pack requires to send the interval param. + // FIXME: index name (first param below) + _, err := c.es.BulkWith("monitoring-beats-6-1", "doc", nil, nil, bulk[:]) + return err +} From 27c11a722b60e3e0139aea5c7a01dfba091e5c6e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 11 Dec 2018 02:47:41 -0800 Subject: [PATCH 11/59] Better if check --- libbeat/cmd/instance/beat.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 53784183025..df114b81bb5 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -364,12 +364,9 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { return err } - if b.Config.MonitoringNew.Enabled() || b.Config.Monitoring.Enabled() { - monitoringCfg, err := chooseMonitoringConfig(b.Config) - if err != nil { - return err - } - + if monitoringCfg, err := selectMonitoringConfig(b.Config); err != nil { + return err + } else if monitoringCfg.Enabled() { settings := report.Settings{ DefaultUsername: settings.Monitoring.DefaultUsername, } From 4c196afe4534c7b1af161fad8fea129ead3351b9 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 11 Dec 2018 02:47:52 -0800 Subject: [PATCH 12/59] Using switch instead of if-else --- libbeat/cmd/instance/beat.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index df114b81bb5..8e42a5cd234 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -980,16 +980,22 @@ func initPaths(cfg *common.Config) error { return nil } -func chooseMonitoringConfig(beatCfg beatConfig) (monitoringCfg *common.Config, err error) { - if beatCfg.MonitoringNew.Enabled() && beatCfg.Monitoring.Enabled() { - err = fmt.Errorf("both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.hosts to monitoring cluster hosts") - } else if beatCfg.Monitoring.Enabled() { - cfgwarn.Deprecate("7.0", "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.hosts to monitoring cluster hosts") - monitoringCfg = beatCfg.Monitoring +func selectMonitoringConfig(beatCfg beatConfig) (*common.Config, error) { + switch { + case beatCfg.MonitoringNew.Enabled() && beatCfg.Monitoring.Enabled(): + const errMonitoringBothConfigEnabled = "both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.hosts to monitoring cluster hosts" + return nil, errors.New(errMonitoringBothConfigEnabled) + case beatCfg.Monitoring.Enabled(): + const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.hosts to monitoring cluster hosts" + cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) + monitoringCfg := beatCfg.Monitoring monitoringCfg.SetString("format", -1, "production") - } else { - monitoringCfg = beatCfg.MonitoringNew + return monitoringCfg, nil + case beatCfg.MonitoringNew.Enabled(): + monitoringCfg := beatCfg.MonitoringNew monitoringCfg.SetString("format", -1, "monitoring") + return monitoringCfg, nil + default: + return nil, nil } - return } From edc68e821a88c7af8c27001b29e9b2bfc562a8dd Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 14 Dec 2018 15:17:40 -0800 Subject: [PATCH 13/59] Add "_type": "doc" if ES version < 7 --- libbeat/monitoring/report/elasticsearch/client.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index b5095cda159..91236ff7add 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -177,11 +177,17 @@ func (c *publishClient) bulkToProduction(params map[string]string, event publish func (c *publishClient) bulkToMonitoring(event publisher.Event) error { action := common.MapStr{ "index": common.MapStr{ - "_type": "doc", "_index": "monitoring-beats-6-1", // FIXME "_routing": nil, }, } + + esVersion := c.es.GetVersion() + v7 := common.MustNewVersion("7.0.0") + if esVersion.LessThan(v7) { + action.Put("index._type", "doc") + } + document := report.Event{ Timestamp: event.Content.Timestamp, Fields: event.Content.Fields, From eb86660941d6f7a84fb1883100fe1366822f2851 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 14 Dec 2018 15:23:55 -0800 Subject: [PATCH 14/59] Rename format -> _format since it's for internal use only --- libbeat/cmd/instance/beat.go | 4 ++-- libbeat/monitoring/report/elasticsearch/client.go | 6 +++--- libbeat/monitoring/report/report.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 8e42a5cd234..b4f1a497477 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -989,11 +989,11 @@ func selectMonitoringConfig(beatCfg beatConfig) (*common.Config, error) { const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.hosts to monitoring cluster hosts" cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) monitoringCfg := beatCfg.Monitoring - monitoringCfg.SetString("format", -1, "production") + monitoringCfg.SetString("_format", -1, "production") return monitoringCfg, nil case beatCfg.MonitoringNew.Enabled(): monitoringCfg := beatCfg.MonitoringNew - monitoringCfg.SetString("format", -1, "monitoring") + monitoringCfg.SetString("_format", -1, "monitoring") return monitoringCfg, nil default: return nil, nil diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 91236ff7add..32ce6a4640f 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -84,7 +84,7 @@ func (c *publishClient) Connect() error { } debugf("XPack monitoring is enabled") - debugf("Publishing to %v cluster", c.params["format"]) + debugf("Publishing to %v cluster", c.params["_format"]) return nil } @@ -122,13 +122,13 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } } - switch params["format"] { + switch params["_format"] { case "production": err = c.bulkToProduction(params, event, t) case "monitoring": err = c.bulkToMonitoring(event) default: - err = fmt.Errorf("unsupported reporting format: %v", params["format"]) + err = fmt.Errorf("unsupported reporting format: %v", params["_format"]) } if err != nil { diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index 79d29d04e39..04614798d43 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -110,11 +110,11 @@ func getReporterConfig( rc = merged } - format, err := monitoringConfig.String("format", -1) + format, err := monitoringConfig.String("_format", -1) if err != nil { return "", nil, err } - rc.SetString("parameters.format", -1, format) + rc.SetString("parameters._format", -1, format) return name, rc, nil } From fce82bd138a4119b4b061a13d313303204bd23dd Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 14 Dec 2018 15:30:04 -0800 Subject: [PATCH 15/59] Rename Monitoring -> XPackMonitoring and MonitoringNew -> Monitoring --- libbeat/cmd/instance/beat.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index b4f1a497477..d04ec18096b 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -104,9 +104,9 @@ type beatConfig struct { Keystore *common.Config `config:"keystore"` // output/publishing related configurations - Pipeline pipeline.Config `config:",inline"` - Monitoring *common.Config `config:"xpack.monitoring"` - MonitoringNew *common.Config `config:"monitoring"` + Pipeline pipeline.Config `config:",inline"` + XPackMonitoring *common.Config `config:"xpack.monitoring"` + Monitoring *common.Config `config:"monitoring"` // central management settings Management *common.Config `config:"management"` @@ -982,17 +982,17 @@ func initPaths(cfg *common.Config) error { func selectMonitoringConfig(beatCfg beatConfig) (*common.Config, error) { switch { - case beatCfg.MonitoringNew.Enabled() && beatCfg.Monitoring.Enabled(): + case beatCfg.Monitoring.Enabled() && beatCfg.XPackMonitoring.Enabled(): const errMonitoringBothConfigEnabled = "both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.hosts to monitoring cluster hosts" return nil, errors.New(errMonitoringBothConfigEnabled) - case beatCfg.Monitoring.Enabled(): + case beatCfg.XPackMonitoring.Enabled(): const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.hosts to monitoring cluster hosts" cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) - monitoringCfg := beatCfg.Monitoring + monitoringCfg := beatCfg.XPackMonitoring monitoringCfg.SetString("_format", -1, "production") return monitoringCfg, nil - case beatCfg.MonitoringNew.Enabled(): - monitoringCfg := beatCfg.MonitoringNew + case beatCfg.Monitoring.Enabled(): + monitoringCfg := beatCfg.Monitoring monitoringCfg.SetString("_format", -1, "monitoring") return monitoringCfg, nil default: From c6f841c3de31a6f22a753959e8f1045a62e928d0 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 14 Dec 2018 15:44:01 -0800 Subject: [PATCH 16/59] Use consts instead of bare strings --- libbeat/cmd/instance/beat.go | 4 ++-- libbeat/monitoring/report/elasticsearch/client.go | 4 ++-- libbeat/monitoring/report/report.go | 5 +++++ 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index d04ec18096b..cdff0e6ae62 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -989,11 +989,11 @@ func selectMonitoringConfig(beatCfg beatConfig) (*common.Config, error) { const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.hosts to monitoring cluster hosts" cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) monitoringCfg := beatCfg.XPackMonitoring - monitoringCfg.SetString("_format", -1, "production") + monitoringCfg.SetString("_format", -1, report.ReportingFormatProduction) return monitoringCfg, nil case beatCfg.Monitoring.Enabled(): monitoringCfg := beatCfg.Monitoring - monitoringCfg.SetString("_format", -1, "monitoring") + monitoringCfg.SetString("_format", -1, report.ReportingFormatMonitoring) return monitoringCfg, nil default: return nil, nil diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 32ce6a4640f..d6432c3bf46 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -123,9 +123,9 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } switch params["_format"] { - case "production": + case report.ReportingFormatProduction: err = c.bulkToProduction(params, event, t) - case "monitoring": + case report.ReportingFormatMonitoring: err = c.bulkToMonitoring(event) default: err = fmt.Errorf("unsupported reporting format: %v", params["_format"]) diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index 04614798d43..7e7ee575432 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -25,6 +25,11 @@ import ( "github.com/elastic/beats/libbeat/common" ) +const ( + ReportingFormatProduction = "production" + ReportingFormatMonitoring = "monitoring" +) + type config struct { // allow for maximum one reporter being configured Reporter common.ConfigNamespace `config:",inline"` From 54dda1e904907fbff83f483ddad40403df149aa0 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 14 Dec 2018 15:49:31 -0800 Subject: [PATCH 17/59] Move format validation check to constructor --- libbeat/monitoring/report/elasticsearch/client.go | 11 +++++++---- .../monitoring/report/elasticsearch/elasticsearch.go | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index d6432c3bf46..db2952b93cd 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -39,12 +39,17 @@ type publishClient struct { func newPublishClient( es *esout.Client, params map[string]string, -) *publishClient { +) (*publishClient, error) { + format := params["_format"] + if format != report.ReportingFormatProduction && format != report.ReportingFormatMonitoring { + return nil, fmt.Errorf("unsupported reporting format: %v", format) + } + p := &publishClient{ es: es, params: params, } - return p + return p, nil } func (c *publishClient) Connect() error { @@ -127,8 +132,6 @@ func (c *publishClient) Publish(batch publisher.Batch) error { err = c.bulkToProduction(params, event, t) case report.ReportingFormatMonitoring: err = c.bulkToMonitoring(event) - default: - err = fmt.Errorf("unsupported reporting format: %v", params["_format"]) } if err != nil { diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 175e8c4e51c..88d25b58a54 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -340,7 +340,7 @@ func makeClient( return nil, err } - return newPublishClient(esClient, params), nil + return newPublishClient(esClient, params) } func closing(log *logp.Logger, c io.Closer) { From 2f29a50a9983b98d4667d48df032c4410987c560 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 17 Jan 2019 22:14:56 -0800 Subject: [PATCH 18/59] Collect cluster UUID when connection is made with ES output cluster --- libbeat/cmd/instance/beat.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index cdff0e6ae62..a37b1d64328 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -287,6 +287,11 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { return nil, err } + err = b.registerClusterUUIDFetching() + if err != nil { + return nil, err + } + reg := monitoring.Default.GetRegistry("libbeat") if reg == nil { reg = monitoring.Default.NewRegistry("libbeat") @@ -794,8 +799,19 @@ func (b *Beat) createOutput(stats outputs.Observer, cfg common.ConfigNamespace) return outputs.Load(b.index, b.Info, stats, cfg.Name(), cfg.Config()) } +func (b *Beat) registerClusterUUIDFetching() error { + if b.Config.Output.Name() == "elasticsearch" { + callback, err := b.clusterUUIDFetchingCallback() + if err != nil { + return err + } + elasticsearch.RegisterConnectCallback(callback) + } + return nil +} + // Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring -func (b *Beat) clusterUUIDFetchingCalling() (elasticsearch.ConnectCallback, error) { +func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, error) { var response struct { ClusterUUID string `json:"cluster_uuid"` } From 8b84a93a7bab1744098d0f157a34eedcf2fe5509 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 17 Jan 2019 23:04:50 -0800 Subject: [PATCH 19/59] Fix format passing + cluster UUID retrieval from registry --- .../monitoring/report/elasticsearch/client.go | 20 +++++++++++++------ .../monitoring/report/elasticsearch/config.go | 1 + .../report/elasticsearch/elasticsearch.go | 15 ++++++++------ libbeat/monitoring/report/report.go | 2 +- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index db2952b93cd..80cc920cd9d 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -40,11 +40,6 @@ func newPublishClient( es *esout.Client, params map[string]string, ) (*publishClient, error) { - format := params["_format"] - if format != report.ReportingFormatProduction && format != report.ReportingFormatMonitoring { - return nil, fmt.Errorf("unsupported reporting format: %v", format) - } - p := &publishClient{ es: es, params: params, @@ -127,7 +122,20 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } } - switch params["_format"] { + f, err := event.Content.Meta.GetValue("format") + if err != nil { + logp.Err("Format not available in monitoring reported. Please report this error: %s", err) + continue + } + + event.Content.Meta.Delete("format") + format, ok := f.(string) + if !ok { + logp.Err("Format not available in monitoring reported. Please report this error: %s", err) + continue + } + + switch format { case report.ReportingFormatProduction: err = c.bulkToProduction(params, event, t) case report.ReportingFormatMonitoring: diff --git a/libbeat/monitoring/report/elasticsearch/config.go b/libbeat/monitoring/report/elasticsearch/config.go index 995f03bbc30..9ebd1595197 100644 --- a/libbeat/monitoring/report/elasticsearch/config.go +++ b/libbeat/monitoring/report/elasticsearch/config.go @@ -43,6 +43,7 @@ type config struct { BufferSize int `config:"buffer_size"` Tags []string `config:"tags"` Backoff backoff `config:"backoff"` + Format string `config:"format"` } type backoff struct { diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 88d25b58a54..12a73348ba2 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -97,6 +97,7 @@ func defaultConfig(settings report.Settings) config { Init: 1 * time.Second, Max: 60 * time.Second, }, + Format: report.ReportingFormatProduction, } if settings.DefaultUsername != "" { @@ -255,12 +256,12 @@ func (r *reporter) initLoop(c config) { log.Info("Successfully connected to X-Pack Monitoring endpoint.") // Start collector and send loop if monitoring endpoint has been found. - go r.snapshotLoop("state", "state", getClusterUUID(), c.StatePeriod) + go r.snapshotLoop("state", "state", c.StatePeriod, c.Format) // For backward compatibility stats is named to metrics. - go r.snapshotLoop("stats", "metrics", getClusterUUID(), c.MetricsPeriod) + go r.snapshotLoop("stats", "metrics", c.MetricsPeriod, c.Format) } -func (r *reporter) snapshotLoop(namespace, prefix, clusterUUID string, period time.Duration) { +func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration, format string) { ticker := time.NewTicker(period) defer ticker.Stop() @@ -297,8 +298,10 @@ func (r *reporter) snapshotLoop(namespace, prefix, clusterUUID string, period ti "interval_ms": int64(period / time.Millisecond), // Converting to seconds as interval only accepts `s` as unit "params": map[string]string{"interval": strconv.Itoa(int(period/time.Second)) + "s"}, + "format": format, } + clusterUUID := getClusterUUID() if clusterUUID != "" { meta.Put("cluster_uuid", clusterUUID) } @@ -382,11 +385,11 @@ func getClusterUUID() string { return "" } - elasticsearchRegistry := outputsRegistry.GetRegistry("elasticearch") + elasticsearchRegistry := outputsRegistry.GetRegistry("elasticsearch") if elasticsearchRegistry == nil { return "" } - // FIXME: this isn't going to work and probably needs to be accessed via a visitor - return monitoring.NewString(elasticsearchRegistry, "cluster_uuid").Get() + snapshot := monitoring.CollectFlatSnapshot(elasticsearchRegistry, monitoring.Full, false) + return snapshot.Strings["cluster_uuid"] } diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index 7e7ee575432..cba4f36b5e2 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -119,7 +119,7 @@ func getReporterConfig( if err != nil { return "", nil, err } - rc.SetString("parameters._format", -1, format) + rc.SetString("format", -1, format) return name, rc, nil } From d3787d9dde2d78fc7b39a42c40241f2395ea96de Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 18 Jan 2019 00:06:09 -0800 Subject: [PATCH 20/59] Fixing error message --- libbeat/cmd/instance/beat.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index a37b1d64328..3bb4f15ee72 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -999,10 +999,10 @@ func initPaths(cfg *common.Config) error { func selectMonitoringConfig(beatCfg beatConfig) (*common.Config, error) { switch { case beatCfg.Monitoring.Enabled() && beatCfg.XPackMonitoring.Enabled(): - const errMonitoringBothConfigEnabled = "both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.hosts to monitoring cluster hosts" + const errMonitoringBothConfigEnabled = "both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.elasticsearch.hosts to monitoring cluster hosts" return nil, errors.New(errMonitoringBothConfigEnabled) case beatCfg.XPackMonitoring.Enabled(): - const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.hosts to monitoring cluster hosts" + const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.elasticsearch.hosts to monitoring cluster hosts" cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) monitoringCfg := beatCfg.XPackMonitoring monitoringCfg.SetString("_format", -1, report.ReportingFormatProduction) From 096c7d5718cb8a97484dae3faf4f57a015bf3656 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 18 Jan 2019 00:07:33 -0800 Subject: [PATCH 21/59] Pass down format correctly --- .../monitoring/report/elasticsearch/client.go | 1 + .../monitoring/report/elasticsearch/config.go | 2 +- libbeat/monitoring/report/report.go | 16 +++++++++------- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 80cc920cd9d..d4d9b360580 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -135,6 +135,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error { continue } + logp.Info("Sending monitoring data to %s cluster", format) switch format { case report.ReportingFormatProduction: err = c.bulkToProduction(params, event, t) diff --git a/libbeat/monitoring/report/elasticsearch/config.go b/libbeat/monitoring/report/elasticsearch/config.go index 9ebd1595197..609bbb677d3 100644 --- a/libbeat/monitoring/report/elasticsearch/config.go +++ b/libbeat/monitoring/report/elasticsearch/config.go @@ -43,7 +43,7 @@ type config struct { BufferSize int `config:"buffer_size"` Tags []string `config:"tags"` Backoff backoff `config:"backoff"` - Format string `config:"format"` + Format string `config:"_format"` } type backoff struct { diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index cba4f36b5e2..13888073811 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -87,6 +87,11 @@ func getReporterConfig( return "", nil, err } + format, err := monitoringConfig.String("_format", -1) + if err != nil { + return "", nil, err + } + // load reporter from `monitoring` section and optionally // merge with output settings if config.Reporter.IsSet() { @@ -101,7 +106,7 @@ func getReporterConfig( }{} rc.Unpack(&hosts) - if len(hosts.Hosts) > 0 { + if format == ReportingFormatProduction && len(hosts.Hosts) > 0 { pathMonHosts := rc.PathOf("hosts") pathOutHost := outCfg.PathOf("hosts") err := fmt.Errorf("'%v' and '%v' are configured", pathMonHosts, pathOutHost) @@ -115,12 +120,7 @@ func getReporterConfig( rc = merged } - format, err := monitoringConfig.String("_format", -1) - if err != nil { - return "", nil, err - } - rc.SetString("format", -1, format) - + rc.SetString("_format", -1, format) return name, rc, nil } @@ -128,6 +128,8 @@ func getReporterConfig( if outputs.IsSet() { name := outputs.Name() if reportFactories[name] != nil { + outCfg := outputs.Config() + outCfg.SetString("_format", -1, format) return name, outputs.Config(), nil } } From 34a9823cd7adc042b35e76c9fdcedab4ef10c626 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 18 Jan 2019 00:46:15 -0800 Subject: [PATCH 22/59] Changing log level for recurring message --- libbeat/monitoring/report/elasticsearch/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index d4d9b360580..c2cb125ab40 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -135,7 +135,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error { continue } - logp.Info("Sending monitoring data to %s cluster", format) + logp.Debug("Sending monitoring data to %s cluster", format) switch format { case report.ReportingFormatProduction: err = c.bulkToProduction(params, event, t) From af85f4b818174bc1c6d9f2ba22e63bb84e5492da Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 21 Jan 2019 04:39:32 -0800 Subject: [PATCH 23/59] Fixing monitoring index name generation + doc fields --- .../monitoring/report/elasticsearch/client.go | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index c2cb125ab40..861fc38ab33 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -20,6 +20,9 @@ package elasticsearch import ( "encoding/json" "fmt" + "time" + + "github.com/pkg/errors" "github.com/pkg/errors" @@ -185,11 +188,10 @@ func (c *publishClient) bulkToProduction(params map[string]string, event publish return err } -// TODO: figure out why this isn't actually indexing anything! func (c *publishClient) bulkToMonitoring(event publisher.Event) error { action := common.MapStr{ "index": common.MapStr{ - "_index": "monitoring-beats-6-1", // FIXME + "_index": getMonitoringIndexName(), "_routing": nil, }, } @@ -200,15 +202,48 @@ func (c *publishClient) bulkToMonitoring(event publisher.Event) error { action.Put("index._type", "doc") } + event.Content.Fields.Put("timestamp", event.Content.Timestamp) + + t, err := event.Content.Meta.GetValue("type") + if err != nil { + return errors.Wrap(err, "could not determine type field") + } + tStr, ok := t.(string) + if !ok { + return fmt.Errorf("type is not string") + } + fields := common.MapStr{ + "type": tStr, + tStr: event.Content.Fields, + } + + interval, err := event.Content.Meta.GetValue("interval_ms") + if err != nil { + return errors.Wrap(err, "could not determine interval_ms field") + } + fields.Put("interval_ms", interval) + + clusterUUID, err := event.Content.Meta.GetValue("cluster_uuid") + if err != nil && err != common.ErrKeyNotFound { + return errors.Wrap(err, "could not determine cluster_uuid field") + } + fields.Put("cluster_uuid", clusterUUID) + document := report.Event{ Timestamp: event.Content.Timestamp, - Fields: event.Content.Fields, + Fields: fields, } bulk := [2]interface{}{action, document} // Currently one request per event is sent. Reason is that each event can contain different // interval params and X-Pack requires to send the interval param. // FIXME: index name (first param below) - _, err := c.es.BulkWith("monitoring-beats-6-1", "doc", nil, nil, bulk[:]) + _, err = c.es.BulkWith(getMonitoringIndexName(), "doc", nil, nil, bulk[:]) return err } + +func getMonitoringIndexName() string { + version := 6 + date := time.Now().Format("2006.01.02") + return fmt.Sprintf(".monitoring-beats-%v-%s", version, date) +} From d9993d21476fff1a1f7c9842fc8435f01acb984d Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 23 Jan 2019 03:31:47 -0800 Subject: [PATCH 24/59] Removing line from vestigial implementation --- libbeat/monitoring/report/elasticsearch/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 861fc38ab33..6ca1037e17d 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -87,7 +87,6 @@ func (c *publishClient) Connect() error { } debugf("XPack monitoring is enabled") - debugf("Publishing to %v cluster", c.params["_format"]) return nil } From 98ebe0e96306151313ef1eeed2ae9c6dd0942ef1 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 23 Jan 2019 03:36:14 -0800 Subject: [PATCH 25/59] Removing unnecessary else --- libbeat/cmd/instance/beat.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 3bb4f15ee72..f8cbcb5b57e 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -369,9 +369,12 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { return err } - if monitoringCfg, err := selectMonitoringConfig(b.Config); err != nil { + monitoringCfg, err := selectMonitoringConfig(b.Config) + if err != nil { return err - } else if monitoringCfg.Enabled() { + } + + if monitoringCfg.Enabled() { settings := report.Settings{ DefaultUsername: settings.Monitoring.DefaultUsername, } From 7e568bf8a7f6d351fca071be7ea63340f6446f11 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 23 Jan 2019 04:08:06 -0800 Subject: [PATCH 26/59] Use iota for reporting format constants --- libbeat/cmd/instance/beat.go | 4 +-- .../monitoring/report/elasticsearch/client.go | 19 ++++++++--- .../monitoring/report/elasticsearch/config.go | 34 ++++++++++--------- .../report/elasticsearch/elasticsearch.go | 4 +-- libbeat/monitoring/report/report.go | 16 +++++---- 5 files changed, 47 insertions(+), 30 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index f8cbcb5b57e..9209e41a136 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -1008,11 +1008,11 @@ func selectMonitoringConfig(beatCfg beatConfig) (*common.Config, error) { const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.elasticsearch.hosts to monitoring cluster hosts" cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) monitoringCfg := beatCfg.XPackMonitoring - monitoringCfg.SetString("_format", -1, report.ReportingFormatProduction) + monitoringCfg.SetInt("_format", -1, int64(report.ReportingFormatXPackMonitoringBulk)) return monitoringCfg, nil case beatCfg.Monitoring.Enabled(): monitoringCfg := beatCfg.Monitoring - monitoringCfg.SetString("_format", -1, report.ReportingFormatMonitoring) + monitoringCfg.SetInt("_format", -1, int64(report.ReportingFormatBulk)) return monitoringCfg, nil default: return nil, nil diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 6ca1037e17d..69b4bce64d0 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -131,17 +131,17 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } event.Content.Meta.Delete("format") - format, ok := f.(string) + format, ok := f.(report.ReportingFormat) if !ok { logp.Err("Format not available in monitoring reported. Please report this error: %s", err) continue } - logp.Debug("Sending monitoring data to %s cluster", format) + logp.Info("Sending monitoring data to %s cluster", getClusterTypeForFormat(format)) switch format { - case report.ReportingFormatProduction: + case report.ReportingFormatBulk: err = c.bulkToProduction(params, event, t) - case report.ReportingFormatMonitoring: + case report.ReportingFormatXPackMonitoringBulk: err = c.bulkToMonitoring(event) } @@ -246,3 +246,14 @@ func getMonitoringIndexName() string { date := time.Now().Format("2006.01.02") return fmt.Sprintf(".monitoring-beats-%v-%s", version, date) } + +func getClusterTypeForFormat(format report.ReportingFormat) string { + switch format { + case report.ReportingFormatXPackMonitoringBulk: + return "monitoring" + case report.ReportingFormatBulk: + return "production" + default: + return "invalid" + } +} diff --git a/libbeat/monitoring/report/elasticsearch/config.go b/libbeat/monitoring/report/elasticsearch/config.go index 609bbb677d3..0a232817bea 100644 --- a/libbeat/monitoring/report/elasticsearch/config.go +++ b/libbeat/monitoring/report/elasticsearch/config.go @@ -20,6 +20,8 @@ package elasticsearch import ( "time" + "github.com/elastic/beats/libbeat/monitoring/report" + "github.com/elastic/beats/libbeat/common/transport/tlscommon" ) @@ -28,22 +30,22 @@ import ( type config struct { Hosts []string Protocol string - Params map[string]string `config:"parameters"` - Headers map[string]string `config:"headers"` - Username string `config:"username"` - Password string `config:"password"` - ProxyURL string `config:"proxy_url"` - CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` - TLS *tlscommon.Config `config:"ssl"` - MaxRetries int `config:"max_retries"` - Timeout time.Duration `config:"timeout"` - MetricsPeriod time.Duration `config:"metrics.period"` - StatePeriod time.Duration `config:"state.period"` - BulkMaxSize int `config:"bulk_max_size" validate:"min=0"` - BufferSize int `config:"buffer_size"` - Tags []string `config:"tags"` - Backoff backoff `config:"backoff"` - Format string `config:"_format"` + Params map[string]string `config:"parameters"` + Headers map[string]string `config:"headers"` + Username string `config:"username"` + Password string `config:"password"` + ProxyURL string `config:"proxy_url"` + CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` + TLS *tlscommon.Config `config:"ssl"` + MaxRetries int `config:"max_retries"` + Timeout time.Duration `config:"timeout"` + MetricsPeriod time.Duration `config:"metrics.period"` + StatePeriod time.Duration `config:"state.period"` + BulkMaxSize int `config:"bulk_max_size" validate:"min=0"` + BufferSize int `config:"buffer_size"` + Tags []string `config:"tags"` + Backoff backoff `config:"backoff"` + Format report.ReportingFormat `config:"_format"` } type backoff struct { diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 12a73348ba2..a1dc7c0eb5b 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -97,7 +97,7 @@ func defaultConfig(settings report.Settings) config { Init: 1 * time.Second, Max: 60 * time.Second, }, - Format: report.ReportingFormatProduction, + Format: report.ReportingFormatXPackMonitoringBulk, } if settings.DefaultUsername != "" { @@ -261,7 +261,7 @@ func (r *reporter) initLoop(c config) { go r.snapshotLoop("stats", "metrics", c.MetricsPeriod, c.Format) } -func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration, format string) { +func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration, format report.ReportingFormat) { ticker := time.NewTicker(period) defer ticker.Stop() diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index 13888073811..d981876f361 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -25,9 +25,12 @@ import ( "github.com/elastic/beats/libbeat/common" ) +type ReportingFormat int + const ( - ReportingFormatProduction = "production" - ReportingFormatMonitoring = "monitoring" + ReportingFormatUnknown ReportingFormat = iota + ReportingFormatXPackMonitoringBulk + ReportingFormatBulk ) type config struct { @@ -87,10 +90,11 @@ func getReporterConfig( return "", nil, err } - format, err := monitoringConfig.String("_format", -1) + f, err := monitoringConfig.Int("_format", -1) if err != nil { return "", nil, err } + format := ReportingFormat(f) // load reporter from `monitoring` section and optionally // merge with output settings @@ -106,7 +110,7 @@ func getReporterConfig( }{} rc.Unpack(&hosts) - if format == ReportingFormatProduction && len(hosts.Hosts) > 0 { + if format == ReportingFormatBulk && len(hosts.Hosts) > 0 { pathMonHosts := rc.PathOf("hosts") pathOutHost := outCfg.PathOf("hosts") err := fmt.Errorf("'%v' and '%v' are configured", pathMonHosts, pathOutHost) @@ -120,7 +124,7 @@ func getReporterConfig( rc = merged } - rc.SetString("_format", -1, format) + rc.SetInt("_format", -1, int64(format)) return name, rc, nil } @@ -129,7 +133,7 @@ func getReporterConfig( name := outputs.Name() if reportFactories[name] != nil { outCfg := outputs.Config() - outCfg.SetString("_format", -1, format) + outCfg.SetInt("_format", -1, int64(format)) return name, outputs.Config(), nil } } From ab30bb8a3c8a86cfc5d440e7dd8779c761122355 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 23 Jan 2019 04:37:16 -0800 Subject: [PATCH 27/59] Better passing of format + validation in constructor --- .../monitoring/report/elasticsearch/client.go | 40 +++++-------------- .../report/elasticsearch/elasticsearch.go | 14 ++++--- libbeat/monitoring/report/report.go | 2 +- 3 files changed, 19 insertions(+), 37 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 69b4bce64d0..f6bf959bd80 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -37,15 +37,18 @@ import ( type publishClient struct { es *esout.Client params map[string]string + format report.ReportingFormat } func newPublishClient( es *esout.Client, params map[string]string, + format report.ReportingFormat, ) (*publishClient, error) { p := &publishClient{ es: es, params: params, + format: format, } return p, nil } @@ -124,25 +127,11 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } } - f, err := event.Content.Meta.GetValue("format") - if err != nil { - logp.Err("Format not available in monitoring reported. Please report this error: %s", err) - continue - } - - event.Content.Meta.Delete("format") - format, ok := f.(report.ReportingFormat) - if !ok { - logp.Err("Format not available in monitoring reported. Please report this error: %s", err) - continue - } - - logp.Info("Sending monitoring data to %s cluster", getClusterTypeForFormat(format)) - switch format { - case report.ReportingFormatBulk: - err = c.bulkToProduction(params, event, t) + switch c.format { case report.ReportingFormatXPackMonitoringBulk: - err = c.bulkToMonitoring(event) + err = c.publishWithXPackMonitoringBulk(params, event, t) + case report.ReportingFormatBulk: + err = c.publishWithBulk(event) } if err != nil { @@ -167,7 +156,7 @@ func (c *publishClient) String() string { return "publish(" + c.es.String() + ")" } -func (c *publishClient) bulkToProduction(params map[string]string, event publisher.Event, docType interface{}) error { +func (c *publishClient) publishWithXPackMonitoringBulk(params map[string]string, event publisher.Event, t interface{}) error { meta := common.MapStr{ "_index": "", "_routing": nil, @@ -187,7 +176,7 @@ func (c *publishClient) bulkToProduction(params map[string]string, event publish return err } -func (c *publishClient) bulkToMonitoring(event publisher.Event) error { +func (c *publishClient) publishWithBulk(event publisher.Event) error { action := common.MapStr{ "index": common.MapStr{ "_index": getMonitoringIndexName(), @@ -246,14 +235,3 @@ func getMonitoringIndexName() string { date := time.Now().Format("2006.01.02") return fmt.Sprintf(".monitoring-beats-%v-%s", version, date) } - -func getClusterTypeForFormat(format report.ReportingFormat) string { - switch format { - case report.ReportingFormatXPackMonitoringBulk: - return "monitoring" - case report.ReportingFormatBulk: - return "production" - default: - return "invalid" - } -} diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index a1dc7c0eb5b..e17053f7ce5 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -19,6 +19,7 @@ package elasticsearch import ( "errors" + "fmt" "io" "math/rand" "net/url" @@ -256,12 +257,12 @@ func (r *reporter) initLoop(c config) { log.Info("Successfully connected to X-Pack Monitoring endpoint.") // Start collector and send loop if monitoring endpoint has been found. - go r.snapshotLoop("state", "state", c.StatePeriod, c.Format) + go r.snapshotLoop("state", "state", c.StatePeriod) // For backward compatibility stats is named to metrics. - go r.snapshotLoop("stats", "metrics", c.MetricsPeriod, c.Format) + go r.snapshotLoop("stats", "metrics", c.MetricsPeriod) } -func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration, format report.ReportingFormat) { +func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration) { ticker := time.NewTicker(period) defer ticker.Stop() @@ -298,7 +299,6 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration, "interval_ms": int64(period / time.Millisecond), // Converting to seconds as interval only accepts `s` as unit "params": map[string]string{"interval": strconv.Itoa(int(period/time.Second)) + "s"}, - "format": format, } clusterUUID := getClusterUUID() @@ -343,7 +343,11 @@ func makeClient( return nil, err } - return newPublishClient(esClient, params) + if config.Format != report.ReportingFormatXPackMonitoringBulk && config.Format != report.ReportingFormatBulk { + return nil, fmt.Errorf("unknown reporting format: %v", config.Format) + } + + return newPublishClient(esClient, params, config.Format) } func closing(log *logp.Logger, c io.Closer) { diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index d981876f361..a9379567a9c 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -28,7 +28,7 @@ import ( type ReportingFormat int const ( - ReportingFormatUnknown ReportingFormat = iota + ReportingFormatUnknown ReportingFormat = iota // to protect against zero-value errors ReportingFormatXPackMonitoringBulk ReportingFormatBulk ) From 988555a4ca3e7f3646b1d02953ffc8f9e8ff6c3e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sat, 2 Feb 2019 03:39:45 -0800 Subject: [PATCH 28/59] Adding CHANGELOG entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fe7064ed32b..c1ad64017f8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - New processor: `copy_fields`. {pull}11303[11303] - Add `error.message` to events when `fail_on_error` is set in `rename` and `copy_fields` processors. {pull}11303[11303] - New processor: `truncate_fields`. {pull}11297[11297] +- Allow a beat to ship monitoring data directly to an Elasticsearch monitoring clsuter. {pull}9260[9260] *Auditbeat* From 6cbd5f596dbfe2aa1051f24cf1f53ebee0ad93ad Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 8 Feb 2019 21:18:25 -0800 Subject: [PATCH 29/59] Adding system test for direct monitoring --- libbeat/tests/system/config/mockbeat.yml.j2 | 7 +++++ libbeat/tests/system/test_monitoring.py | 32 +++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/libbeat/tests/system/config/mockbeat.yml.j2 b/libbeat/tests/system/config/mockbeat.yml.j2 index d714bd74d84..643b2e09999 100644 --- a/libbeat/tests/system/config/mockbeat.yml.j2 +++ b/libbeat/tests/system/config/mockbeat.yml.j2 @@ -109,5 +109,12 @@ xpack.monitoring.elasticsearch.metrics.period: 2s # to speed up tests xpack.monitoring.elasticsearch.state.period: 3s # to speed up tests {% endif -%} +{% if monitoring -%} +#================================ X-Pack Monitoring (direct) ===================================== +monitoring.elasticsearch.hosts: {{xpack.monitoring.elasticsearch.hosts}} +monitoring.elasticsearch.metrics.period: 2s # to speed up tests +monitoring.elasticsearch.state.period: 3s # to speed up tests +{% endif -%} + # vim: set ft=jinja: diff --git a/libbeat/tests/system/test_monitoring.py b/libbeat/tests/system/test_monitoring.py index 17ee66dbd5f..46b6fe824c8 100644 --- a/libbeat/tests/system/test_monitoring.py +++ b/libbeat/tests/system/test_monitoring.py @@ -49,6 +49,38 @@ def test_via_output_cluster(self): field_names = ['cluster_uuid', 'timestamp', 'interval_ms', 'type', 'source_node', monitoring_doc_type] self.assert_monitoring_doc_contains_fields(monitoring_doc_type, field_names) + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_direct_to_monitoring_cluster(self): + """ + Test shipping monitoring data directly to the monitoring cluster. + Make sure expected documents are indexed in monitoring cluster. + """ + + self.render_config_template( + "mockbeat", + monitoring={ + "elasticsearch": { + "hosts": [self.get_elasticsearch_monitoring_url()] + } + } + ) + + self.clean() + + proc = self.start_beat(config="mockbeat.yml") + self.wait_until(lambda: self.log_contains("mockbeat start running.")) + self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) + self.wait_until(lambda: self.log_contains(re.compile( + "Connection to .*elasticsearch\("+self.get_elasticsearch_monitoring_url()+"\).* established"))) + self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) + self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + + for monitoring_doc_type in ['beats_stats', 'beats_state']: + field_names = ['cluster_uuid', 'timestamp', 'interval_ms', 'type', monitoring_doc_type] + self.assert_monitoring_doc_contains_fields(monitoring_doc_type, field_names) + + def monitoring_doc_exists(self, monitoring_type): results = self.es_monitoring.search( index='.monitoring-beats-*', From 97a1d313d3e5d94615735af08d4b1973db62ad0c Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 8 Feb 2019 21:25:02 -0800 Subject: [PATCH 30/59] Refactoring --- libbeat/tests/system/test_monitoring.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/libbeat/tests/system/test_monitoring.py b/libbeat/tests/system/test_monitoring.py index 46b6fe824c8..449fdc02fd9 100644 --- a/libbeat/tests/system/test_monitoring.py +++ b/libbeat/tests/system/test_monitoring.py @@ -16,6 +16,10 @@ def setUp(self): self.es = Elasticsearch([self.get_elasticsearch_url()]) self.es_monitoring = Elasticsearch([self.get_elasticsearch_monitoring_url()]) + def tearDown(self): + if self.proc: + self.proc.check_kill_and_wait() + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") @attr('integration') def test_via_output_cluster(self): @@ -35,9 +39,10 @@ def test_via_output_cluster(self): } ) - self.clean() + self.clean_output_cluster() + self.clean_monitoring_cluster() - proc = self.start_beat(config="mockbeat.yml") + self.proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) self.wait_until(lambda: self.log_contains(re.compile( @@ -66,9 +71,9 @@ def test_direct_to_monitoring_cluster(self): } ) - self.clean() + self.clean_monitoring_cluster() - proc = self.start_beat(config="mockbeat.yml") + self.proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) self.wait_until(lambda: self.log_contains(re.compile( @@ -80,7 +85,6 @@ def test_direct_to_monitoring_cluster(self): field_names = ['cluster_uuid', 'timestamp', 'interval_ms', 'type', monitoring_doc_type] self.assert_monitoring_doc_contains_fields(monitoring_doc_type, field_names) - def monitoring_doc_exists(self, monitoring_type): results = self.es_monitoring.search( index='.monitoring-beats-*', @@ -102,7 +106,7 @@ def assert_monitoring_doc_contains_fields(self, monitoring_type, field_names): for field_name in field_names: assert field_name in source - def clean(self): + def clean_output_cluster(self): # Setup remote exporter self.es.cluster.put_settings(body={ "transient": { @@ -120,6 +124,7 @@ def clean(self): } }) + def clean_monitoring_cluster(self): # Delete any old beats monitoring data self.es_monitoring.indices.delete(index=".monitoring-beats-*", ignore=[404]) From 0d7d291f241e3672f474dc7837deb70450c3797e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 8 Feb 2019 21:30:27 -0800 Subject: [PATCH 31/59] Adding skeleton comparison test --- libbeat/tests/system/test_monitoring.py | 59 +++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/libbeat/tests/system/test_monitoring.py b/libbeat/tests/system/test_monitoring.py index 449fdc02fd9..73cfaa24292 100644 --- a/libbeat/tests/system/test_monitoring.py +++ b/libbeat/tests/system/test_monitoring.py @@ -85,6 +85,65 @@ def test_direct_to_monitoring_cluster(self): field_names = ['cluster_uuid', 'timestamp', 'interval_ms', 'type', monitoring_doc_type] self.assert_monitoring_doc_contains_fields(monitoring_doc_type, field_names) + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_compare(self): + """ + Test that monitoring docs are the same, regardless of how they are shipped. + """ + + self.render_config_template( + "mockbeat", + xpack={ + "monitoring": { + "elasticsearch": { + "hosts": [self.get_elasticsearch_url()] + } + } + } + ) + + self.clean_output_cluster() + self.clean_monitoring_cluster() + + self.proc = self.start_beat(config="mockbeat.yml") + self.wait_until(lambda: self.log_contains("mockbeat start running.")) + self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) + self.wait_until(lambda: self.log_contains(re.compile( + "Connection to .*elasticsearch\("+self.get_elasticsearch_url()+"\).* established"))) + self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) + self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + + # TODO: Get a beats_stats doc + # TODO: Get a beats_state doc + + self.proc.check_kill_and_wait() + + self.render_config_template( + "mockbeat", + monitoring={ + "elasticsearch": { + "hosts": [self.get_elasticsearch_monitoring_url()] + } + } + ) + + self.clean_monitoring_cluster() + + self.proc = self.start_beat(config="mockbeat.yml") + self.wait_until(lambda: self.log_contains("mockbeat start running.")) + self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) + self.wait_until(lambda: self.log_contains(re.compile( + "Connection to .*elasticsearch\("+self.get_elasticsearch_monitoring_url()+"\).* established"))) + self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) + self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + + # TODO: Get a beats_stats doc + # TODO: Get a beats_state doc + + # TODO: compare the two beats_stats docs, making sure same keys exist under `beats_stats` field + # TODO: compare the two beats_state docs, making sure same keys exist under `beats_state` field + def monitoring_doc_exists(self, monitoring_type): results = self.es_monitoring.search( index='.monitoring-beats-*', From e0016636e103cdf89360eeebb9b1180fa5efe8f9 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 8 Feb 2019 21:32:29 -0800 Subject: [PATCH 32/59] Refactoring: renaming --- libbeat/tests/system/test_monitoring.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/libbeat/tests/system/test_monitoring.py b/libbeat/tests/system/test_monitoring.py index 73cfaa24292..6f33a95e2ab 100644 --- a/libbeat/tests/system/test_monitoring.py +++ b/libbeat/tests/system/test_monitoring.py @@ -39,8 +39,8 @@ def test_via_output_cluster(self): } ) - self.clean_output_cluster() - self.clean_monitoring_cluster() + self.init_output_cluster() + self.init_monitoring_cluster() self.proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) @@ -71,7 +71,7 @@ def test_direct_to_monitoring_cluster(self): } ) - self.clean_monitoring_cluster() + self.init_monitoring_cluster() self.proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) @@ -103,8 +103,8 @@ def test_compare(self): } ) - self.clean_output_cluster() - self.clean_monitoring_cluster() + self.init_output_cluster() + self.init_monitoring_cluster() self.proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) @@ -128,7 +128,7 @@ def test_compare(self): } ) - self.clean_monitoring_cluster() + self.init_monitoring_cluster() self.proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) @@ -165,7 +165,7 @@ def assert_monitoring_doc_contains_fields(self, monitoring_type, field_names): for field_name in field_names: assert field_name in source - def clean_output_cluster(self): + def init_output_cluster(self): # Setup remote exporter self.es.cluster.put_settings(body={ "transient": { @@ -183,7 +183,7 @@ def clean_output_cluster(self): } }) - def clean_monitoring_cluster(self): + def init_monitoring_cluster(self): # Delete any old beats monitoring data self.es_monitoring.indices.delete(index=".monitoring-beats-*", ignore=[404]) From 252ec04bf5a9f33c51cb7396269424b52e4ee1fe Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 8 Feb 2019 22:33:07 -0800 Subject: [PATCH 33/59] Refactoring --- libbeat/tests/system/test_monitoring.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/libbeat/tests/system/test_monitoring.py b/libbeat/tests/system/test_monitoring.py index 6f33a95e2ab..0e6d7e97d6b 100644 --- a/libbeat/tests/system/test_monitoring.py +++ b/libbeat/tests/system/test_monitoring.py @@ -16,10 +16,6 @@ def setUp(self): self.es = Elasticsearch([self.get_elasticsearch_url()]) self.es_monitoring = Elasticsearch([self.get_elasticsearch_monitoring_url()]) - def tearDown(self): - if self.proc: - self.proc.check_kill_and_wait() - @unittest.skipUnless(INTEGRATION_TESTS, "integration test") @attr('integration') def test_via_output_cluster(self): @@ -42,7 +38,7 @@ def test_via_output_cluster(self): self.init_output_cluster() self.init_monitoring_cluster() - self.proc = self.start_beat(config="mockbeat.yml") + proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) self.wait_until(lambda: self.log_contains(re.compile( @@ -50,6 +46,8 @@ def test_via_output_cluster(self): self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + proc.check_kill_and_wait() + for monitoring_doc_type in ['beats_stats', 'beats_state']: field_names = ['cluster_uuid', 'timestamp', 'interval_ms', 'type', 'source_node', monitoring_doc_type] self.assert_monitoring_doc_contains_fields(monitoring_doc_type, field_names) @@ -73,7 +71,7 @@ def test_direct_to_monitoring_cluster(self): self.init_monitoring_cluster() - self.proc = self.start_beat(config="mockbeat.yml") + proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) self.wait_until(lambda: self.log_contains(re.compile( @@ -81,6 +79,8 @@ def test_direct_to_monitoring_cluster(self): self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + proc.check_kill_and_wait() + for monitoring_doc_type in ['beats_stats', 'beats_state']: field_names = ['cluster_uuid', 'timestamp', 'interval_ms', 'type', monitoring_doc_type] self.assert_monitoring_doc_contains_fields(monitoring_doc_type, field_names) @@ -106,7 +106,7 @@ def test_compare(self): self.init_output_cluster() self.init_monitoring_cluster() - self.proc = self.start_beat(config="mockbeat.yml") + proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) self.wait_until(lambda: self.log_contains(re.compile( @@ -114,11 +114,11 @@ def test_compare(self): self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + proc.check_kill_and_wait() + # TODO: Get a beats_stats doc # TODO: Get a beats_state doc - self.proc.check_kill_and_wait() - self.render_config_template( "mockbeat", monitoring={ @@ -130,7 +130,7 @@ def test_compare(self): self.init_monitoring_cluster() - self.proc = self.start_beat(config="mockbeat.yml") + proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) self.wait_until(lambda: self.log_contains(re.compile( @@ -138,6 +138,8 @@ def test_compare(self): self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + proc.check_kill_and_wait() + # TODO: Get a beats_stats doc # TODO: Get a beats_state doc From 1e1a380ab91471fefdb2def09112039d832b81e9 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sat, 9 Feb 2019 06:44:44 -0800 Subject: [PATCH 34/59] Better cleanup --- libbeat/tests/system/test_monitoring.py | 39 ++++++++++++++++++------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/libbeat/tests/system/test_monitoring.py b/libbeat/tests/system/test_monitoring.py index 0e6d7e97d6b..0f175785310 100644 --- a/libbeat/tests/system/test_monitoring.py +++ b/libbeat/tests/system/test_monitoring.py @@ -35,14 +35,15 @@ def test_via_output_cluster(self): } ) + self.clean_output_cluster() + self.clean_monitoring_cluster() self.init_output_cluster() - self.init_monitoring_cluster() proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) self.wait_until(lambda: self.log_contains(re.compile( - "Connection to .*elasticsearch\("+self.get_elasticsearch_url()+"\).* established"))) + "Connection to .*elasticsearch\("+self. ()+"\).* established"))) self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) @@ -69,7 +70,8 @@ def test_direct_to_monitoring_cluster(self): } ) - self.init_monitoring_cluster() + self.clean_output_cluster() + self.clean_monitoring_cluster() proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) @@ -103,8 +105,9 @@ def test_compare(self): } ) + self.clean_output_cluster() + self.clean_monitoring_cluster() self.init_output_cluster() - self.init_monitoring_cluster() proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) @@ -128,7 +131,8 @@ def test_compare(self): } ) - self.init_monitoring_cluster() + self.clean_output_cluster() + self.clean_monitoring_cluster() proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) @@ -140,7 +144,7 @@ def test_compare(self): proc.check_kill_and_wait() - # TODO: Get a beats_stats doc + # TODO: Get a beats_stats doc # TODO: Get a beats_state doc # TODO: compare the two beats_stats docs, making sure same keys exist under `beats_stats` field @@ -167,6 +171,25 @@ def assert_monitoring_doc_contains_fields(self, monitoring_type, field_names): for field_name in field_names: assert field_name in source + def clean_output_cluster(self): + # Remove all exporters + self.es.cluster.put_settings(body={ + "transient": { + "xpack.monitoring.exporters.*": None + } + }) + + # Disable collection + self.es.cluster.put_settings(body={ + "transient": { + "xpack.monitoring.collection.enabled": None + } + }) + + def clean_monitoring_cluster(self): + # Delete any old beats monitoring data + self.es_monitoring.indices.delete(index=".monitoring-beats-*", ignore=[404]) + def init_output_cluster(self): # Setup remote exporter self.es.cluster.put_settings(body={ @@ -185,10 +208,6 @@ def init_output_cluster(self): } }) - def init_monitoring_cluster(self): - # Delete any old beats monitoring data - self.es_monitoring.indices.delete(index=".monitoring-beats-*", ignore=[404]) - def get_elasticsearch_monitoring_url(self): return "http://{host}:{port}".format( host=os.getenv("ES_MONITORING_HOST", "localhost"), From 8d2b706dad28a7060c7d0d746f22263b1028cc45 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sat, 9 Feb 2019 06:50:44 -0800 Subject: [PATCH 35/59] Fixing formatting --- libbeat/tests/system/test_monitoring.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/tests/system/test_monitoring.py b/libbeat/tests/system/test_monitoring.py index 0f175785310..c21f796b85e 100644 --- a/libbeat/tests/system/test_monitoring.py +++ b/libbeat/tests/system/test_monitoring.py @@ -144,7 +144,7 @@ def test_compare(self): proc.check_kill_and_wait() - # TODO: Get a beats_stats doc + # TODO: Get a beats_stats doc # TODO: Get a beats_state doc # TODO: compare the two beats_stats docs, making sure same keys exist under `beats_stats` field From 83f8d6607f4cf2b7155152156212189918262c28 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sat, 9 Feb 2019 07:20:55 -0800 Subject: [PATCH 36/59] Fixing syntax error --- libbeat/tests/system/test_monitoring.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/tests/system/test_monitoring.py b/libbeat/tests/system/test_monitoring.py index c21f796b85e..035673f85d5 100644 --- a/libbeat/tests/system/test_monitoring.py +++ b/libbeat/tests/system/test_monitoring.py @@ -43,7 +43,7 @@ def test_via_output_cluster(self): self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) self.wait_until(lambda: self.log_contains(re.compile( - "Connection to .*elasticsearch\("+self. ()+"\).* established"))) + "Connection to .*elasticsearch\("+self.get_elasticsearch_url()+"\).* established"))) self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) From f0d108be87df593e8aed816eee0c5bb7791efc2e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Sat, 9 Feb 2019 07:50:34 -0800 Subject: [PATCH 37/59] Fixing variable name in template --- libbeat/tests/system/config/mockbeat.yml.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/tests/system/config/mockbeat.yml.j2 b/libbeat/tests/system/config/mockbeat.yml.j2 index 643b2e09999..55f2ec97d5f 100644 --- a/libbeat/tests/system/config/mockbeat.yml.j2 +++ b/libbeat/tests/system/config/mockbeat.yml.j2 @@ -111,7 +111,7 @@ xpack.monitoring.elasticsearch.state.period: 3s # to speed up tests {% if monitoring -%} #================================ X-Pack Monitoring (direct) ===================================== -monitoring.elasticsearch.hosts: {{xpack.monitoring.elasticsearch.hosts}} +monitoring.elasticsearch.hosts: {{monitoring.elasticsearch.hosts}} monitoring.elasticsearch.metrics.period: 2s # to speed up tests monitoring.elasticsearch.state.period: 3s # to speed up tests {% endif -%} From 91b90712f3ce7667fdbdab1ea94fddd31de089f4 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 12 Feb 2019 10:50:03 -0800 Subject: [PATCH 38/59] Fleshing out TODOs --- libbeat/tests/system/test_monitoring.py | 39 +++++++++++++++++++------ 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/libbeat/tests/system/test_monitoring.py b/libbeat/tests/system/test_monitoring.py index 035673f85d5..1071cb5065d 100644 --- a/libbeat/tests/system/test_monitoring.py +++ b/libbeat/tests/system/test_monitoring.py @@ -119,8 +119,8 @@ def test_compare(self): proc.check_kill_and_wait() - # TODO: Get a beats_stats doc - # TODO: Get a beats_state doc + indirect_beats_stats_doc = self.get_monitoring_doc('beats_stats') + indirect_beats_state_doc = self.get_monitoring_doc('beats_state') self.render_config_template( "mockbeat", @@ -144,21 +144,30 @@ def test_compare(self): proc.check_kill_and_wait() - # TODO: Get a beats_stats doc - # TODO: Get a beats_state doc + direct_beats_stats_doc = self.get_monitoring_doc('beats_stats') + direct_beats_state_doc = self.get_monitoring_doc('beats_state') - # TODO: compare the two beats_stats docs, making sure same keys exist under `beats_stats` field - # TODO: compare the two beats_state docs, making sure same keys exist under `beats_state` field + self.assert_same_structure(indirect_beats_state_doc['beats_state'], direct_beats_state_doc['beats_state']) + self.assert_same_structure(indirect_beats_stats_doc['beats_stats'], direct_beats_stats_doc['beats_stats']) - def monitoring_doc_exists(self, monitoring_type): + def search_monitoring_doc(self, monitoring_type): results = self.es_monitoring.search( index='.monitoring-beats-*', q='type:'+monitoring_type, size=1 ) - hits = results['hits']['hits'] + return results['hits']['hits'] + + def monitoring_doc_exists(self, monitoring_type): + hits = self.search_monitoring_doc(monitoring_type) return len(hits) == 1 + def get_monitoring_doc(self, monitoring_type): + hits = self.search_monitoring_doc(monitoring_type) + if len(hits) != 1: + return None + return hits[0]['_source'] + def assert_monitoring_doc_contains_fields(self, monitoring_type, field_names): results = self.es_monitoring.search( index='.monitoring-beats-*', @@ -169,7 +178,19 @@ def assert_monitoring_doc_contains_fields(self, monitoring_type, field_names): source = hits[0]['_source'] for field_name in field_names: - assert field_name in source + self.assertIn(field_name, source) + + def assert_same_structure(self, dict1, dict2): + dict1_keys = dict1.keys() + dict2_keys = dict2.keys() + + self.assertEqual(len(dict1_keys), len(dict2_keys)) + for key in dict1_keys: + dict1_val = dict1[key] + dict2_val = dict2[key] + self.assertEqual(type(dict1_val), type(dict2_val)) + if type(dict1_val) is dict: + self.assert_same_structure(dict1_val, dict2_val) def clean_output_cluster(self): # Remove all exporters From 2d822b6f37a8a9c3957682c361d629336480f47f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 12 Feb 2019 11:11:28 -0800 Subject: [PATCH 39/59] Make Hound happy (woof woof!) --- libbeat/monitoring/report/report.go | 2 ++ libbeat/outputs/elasticsearch/client.go | 1 + 2 files changed, 3 insertions(+) diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index a9379567a9c..0af15b45dab 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -25,8 +25,10 @@ import ( "github.com/elastic/beats/libbeat/common" ) +// ReportingFormat encodes the type of format to report monitoring data in type ReportingFormat int +// Enumerations of various ReportingFormats const ( ReportingFormatUnknown ReportingFormat = iota // to protect against zero-value errors ReportingFormatXPackMonitoringBulk diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index be23512dea5..03f8dca7e83 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -77,6 +77,7 @@ type ClientSettings struct { Observer outputs.Observer } +// ConnectCallback defines the type for the function to be called when the Elasticsearch client successfully connects to the cluster type ConnectCallback func(client *Client) error // Connection manages the connection for a given client. From ddf98541a79d73f2f9ded8e26a31a69956635acb Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 22 Mar 2019 12:33:49 -0700 Subject: [PATCH 40/59] Fixing rebase errors --- libbeat/monitoring/report/elasticsearch/client.go | 6 ++---- libbeat/outputs/elasticsearch/elasticsearch.go | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index f6bf959bd80..601aaa81d66 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -24,8 +24,6 @@ import ( "github.com/pkg/errors" - "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring/report" @@ -167,12 +165,12 @@ func (c *publishClient) publishWithXPackMonitoringBulk(params map[string]string, report.Event{ Timestamp: event.Content.Timestamp, Fields: event.Content.Fields, - } + }, } // Currently one request per event is sent. Reason is that each event can contain different // interval params and X-Pack requires to send the interval param. - _, err = c.es.SendMonitoringBulk(params, bulk[:]) + _, err := c.es.SendMonitoringBulk(params, bulk[:]) return err } diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 7d20e93cca3..0602ea4e7ca 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -66,7 +66,7 @@ var connectCallbackRegistry = newCallbacksRegistry() var globalCallbackRegistry = newCallbacksRegistry() // RegisterGlobalCallback register a global callbacks. -func RegisterGlobalCallback(callback connectCallback) (uuid.UUID, error) { +func RegisterGlobalCallback(callback ConnectCallback) (uuid.UUID, error) { globalCallbackRegistry.mutex.Lock() defer globalCallbackRegistry.mutex.Unlock() From 8a7c1fbe42aa3fb6bba6ed446f2be797e4a90c7a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 22 Mar 2019 12:54:18 -0700 Subject: [PATCH 41/59] Make monitoring type a string + better variable name --- libbeat/monitoring/report/elasticsearch/client.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 601aaa81d66..08938aaee58 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -102,13 +102,18 @@ func (c *publishClient) Publish(batch publisher.Batch) error { var reason error for _, event := range events { - // Extract time + // Extract type t, err := event.Content.Meta.GetValue("type") if err != nil { logp.Err("Type not available in monitoring reported. Please report this error: %s", err) continue } + typ, ok := t.(string) + if !ok { + logp.Err("monitoring type is not a string") + } + var params = map[string]string{} // Copy params for k, v := range c.params { @@ -127,7 +132,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error { switch c.format { case report.ReportingFormatXPackMonitoringBulk: - err = c.publishWithXPackMonitoringBulk(params, event, t) + err = c.publishWithXPackMonitoringBulk(params, event, typ) case report.ReportingFormatBulk: err = c.publishWithBulk(event) } @@ -154,11 +159,11 @@ func (c *publishClient) String() string { return "publish(" + c.es.String() + ")" } -func (c *publishClient) publishWithXPackMonitoringBulk(params map[string]string, event publisher.Event, t interface{}) error { +func (c *publishClient) publishWithXPackMonitoringBulk(params map[string]string, event publisher.Event, typ string) error { meta := common.MapStr{ "_index": "", "_routing": nil, - "_type": t, + "_type": typ, } bulk := [2]interface{}{ common.MapStr{"index": meta}, From f76b536d7d407809ea8242191cee208292220595 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 22 Mar 2019 12:56:44 -0700 Subject: [PATCH 42/59] Change major version check --- libbeat/monitoring/report/elasticsearch/client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 08938aaee58..7773e867894 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -187,9 +187,7 @@ func (c *publishClient) publishWithBulk(event publisher.Event) error { }, } - esVersion := c.es.GetVersion() - v7 := common.MustNewVersion("7.0.0") - if esVersion.LessThan(v7) { + if c.es.GetVersion().Major < 7 { action.Put("index._type", "doc") } From 66859658227b0713447fa74dbbf34ecc60e415ae Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 22 Mar 2019 13:01:03 -0700 Subject: [PATCH 43/59] Extract meta from action --- libbeat/monitoring/report/elasticsearch/client.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 7773e867894..a96de20fa91 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -180,15 +180,17 @@ func (c *publishClient) publishWithXPackMonitoringBulk(params map[string]string, } func (c *publishClient) publishWithBulk(event publisher.Event) error { - action := common.MapStr{ - "index": common.MapStr{ - "_index": getMonitoringIndexName(), - "_routing": nil, - }, + meta := common.MapStr{ + "_index": getMonitoringIndexName(), + "_routing": nil, } if c.es.GetVersion().Major < 7 { - action.Put("index._type", "doc") + meta["_type"] = "doc" + } + + action := common.MapStr{ + "index": meta, } event.Content.Fields.Put("timestamp", event.Content.Timestamp) From 47a67f29e544a4f656efaa0f2370162f4267cca0 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 22 Mar 2019 13:18:27 -0700 Subject: [PATCH 44/59] Use shorter method names --- libbeat/monitoring/report/elasticsearch/client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index a96de20fa91..a366535c8df 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -132,9 +132,9 @@ func (c *publishClient) Publish(batch publisher.Batch) error { switch c.format { case report.ReportingFormatXPackMonitoringBulk: - err = c.publishWithXPackMonitoringBulk(params, event, typ) + err = c.publishXPackBulk(params, event, typ) case report.ReportingFormatBulk: - err = c.publishWithBulk(event) + err = c.publishBulk(event) } if err != nil { @@ -159,7 +159,7 @@ func (c *publishClient) String() string { return "publish(" + c.es.String() + ")" } -func (c *publishClient) publishWithXPackMonitoringBulk(params map[string]string, event publisher.Event, typ string) error { +func (c *publishClient) publishXPackBulk(params map[string]string, event publisher.Event, typ string) error { meta := common.MapStr{ "_index": "", "_routing": nil, @@ -179,7 +179,7 @@ func (c *publishClient) publishWithXPackMonitoringBulk(params map[string]string, return err } -func (c *publishClient) publishWithBulk(event publisher.Event) error { +func (c *publishClient) publishBulk(event publisher.Event) error { meta := common.MapStr{ "_index": getMonitoringIndexName(), "_routing": nil, From dc800e335d7904f14aa01401747bf3f53a12dbbb Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 22 Mar 2019 13:21:05 -0700 Subject: [PATCH 45/59] Remove duplication of logic --- .../monitoring/report/elasticsearch/client.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index a366535c8df..b51a8e3f6f1 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -134,7 +134,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error { case report.ReportingFormatXPackMonitoringBulk: err = c.publishXPackBulk(params, event, typ) case report.ReportingFormatBulk: - err = c.publishBulk(event) + err = c.publishBulk(event, typ) } if err != nil { @@ -179,7 +179,7 @@ func (c *publishClient) publishXPackBulk(params map[string]string, event publish return err } -func (c *publishClient) publishBulk(event publisher.Event) error { +func (c *publishClient) publishBulk(event publisher.Event, typ string) error { meta := common.MapStr{ "_index": getMonitoringIndexName(), "_routing": nil, @@ -195,17 +195,9 @@ func (c *publishClient) publishBulk(event publisher.Event) error { event.Content.Fields.Put("timestamp", event.Content.Timestamp) - t, err := event.Content.Meta.GetValue("type") - if err != nil { - return errors.Wrap(err, "could not determine type field") - } - tStr, ok := t.(string) - if !ok { - return fmt.Errorf("type is not string") - } fields := common.MapStr{ - "type": tStr, - tStr: event.Content.Fields, + "type": typ, + typ: event.Content.Fields, } interval, err := event.Content.Meta.GetValue("interval_ms") From 2059928f34eee3e77ee5ca7a15ac83df73d6ac40 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 22 Mar 2019 15:07:21 -0700 Subject: [PATCH 46/59] Refactoring: move selectMonitoringConfig to monitoring.SelectConfig --- libbeat/cmd/instance/beat.go | 30 +++++--------------------- libbeat/monitoring/monitoring.go | 36 +++++++++++++++++++++++++++++++- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 9209e41a136..d18a3e87916 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -49,7 +49,6 @@ import ( "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/cloudid" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/file" "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/common/seccomp" @@ -104,9 +103,10 @@ type beatConfig struct { Keystore *common.Config `config:"keystore"` // output/publishing related configurations - Pipeline pipeline.Config `config:",inline"` - XPackMonitoring *common.Config `config:"xpack.monitoring"` - Monitoring *common.Config `config:"monitoring"` + Pipeline pipeline.Config `config:",inline"` + + // monitoring settings + monitoring.MonitoringBeatConfig `config:",inline"` // central management settings Management *common.Config `config:"management"` @@ -369,7 +369,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { return err } - monitoringCfg, err := selectMonitoringConfig(b.Config) + monitoringCfg, err := monitoring.SelectConfig(b.Config.MonitoringBeatConfig) if err != nil { return err } @@ -998,23 +998,3 @@ func initPaths(cfg *common.Config) error { } return nil } - -func selectMonitoringConfig(beatCfg beatConfig) (*common.Config, error) { - switch { - case beatCfg.Monitoring.Enabled() && beatCfg.XPackMonitoring.Enabled(): - const errMonitoringBothConfigEnabled = "both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.elasticsearch.hosts to monitoring cluster hosts" - return nil, errors.New(errMonitoringBothConfigEnabled) - case beatCfg.XPackMonitoring.Enabled(): - const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.elasticsearch.hosts to monitoring cluster hosts" - cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) - monitoringCfg := beatCfg.XPackMonitoring - monitoringCfg.SetInt("_format", -1, int64(report.ReportingFormatXPackMonitoringBulk)) - return monitoringCfg, nil - case beatCfg.Monitoring.Enabled(): - monitoringCfg := beatCfg.Monitoring - monitoringCfg.SetInt("_format", -1, int64(report.ReportingFormatBulk)) - return monitoringCfg, nil - default: - return nil, nil - } -} diff --git a/libbeat/monitoring/monitoring.go b/libbeat/monitoring/monitoring.go index 943f4e79e1b..4b444a7e607 100644 --- a/libbeat/monitoring/monitoring.go +++ b/libbeat/monitoring/monitoring.go @@ -17,7 +17,19 @@ package monitoring -import "errors" +import ( + "errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/monitoring/report" +) + +// MonitoringBeatConfig represents the part of the $BEAT.yml to do with monitoring settings +type MonitoringBeatConfig struct { + XPackMonitoring *common.Config `config:"xpack.monitoring"` + Monitoring *common.Config `config:"monitoring"` +} type Mode uint8 @@ -67,3 +79,25 @@ func Remove(name string) { func Clear() error { return Default.Clear() } + +// SelectConfig selects the appropriate monitoring configuration based on the user's settings in $BEAT.yml. Users may either +// use xpack.monitoring.* settings OR monitoring.* settings but not both. +func SelectConfig(beatCfg MonitoringBeatConfig) (*common.Config, error) { + switch { + case beatCfg.Monitoring.Enabled() && beatCfg.XPackMonitoring.Enabled(): + errMonitoringBothConfigEnabled := errors.New("both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.elasticsearch.hosts to monitoring cluster hosts") + return nil, errMonitoringBothConfigEnabled + case beatCfg.XPackMonitoring.Enabled(): + const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.elasticsearch.hosts to monitoring cluster hosts" + cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) + monitoringCfg := beatCfg.XPackMonitoring + monitoringCfg.SetInt("_format", -1, int64(report.ReportingFormatXPackMonitoringBulk)) + return monitoringCfg, nil + case beatCfg.Monitoring.Enabled(): + monitoringCfg := beatCfg.Monitoring + monitoringCfg.SetInt("_format", -1, int64(report.ReportingFormatBulk)) + return monitoringCfg, nil + default: + return nil, nil + } +} From a367a9f377c233c8c314af8946c5577846a1109e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 22 Mar 2019 15:17:36 -0700 Subject: [PATCH 47/59] Update schema version --- libbeat/monitoring/report/elasticsearch/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index b51a8e3f6f1..1950cf12f3c 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -226,7 +226,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error { } func getMonitoringIndexName() string { - version := 6 + version := 7 date := time.Now().Format("2006.01.02") return fmt.Sprintf(".monitoring-beats-%v-%s", version, date) } From 9b824ec696fe089a30733d3b0c65f0b6c5c81417 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 26 Mar 2019 08:21:00 -0700 Subject: [PATCH 48/59] Move variable into callback fn's scope so memory is not shared across callback calls --- libbeat/cmd/instance/beat.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index d18a3e87916..1fb99ec6fbf 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -815,11 +815,11 @@ func (b *Beat) registerClusterUUIDFetching() error { // Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, error) { - var response struct { - ClusterUUID string `json:"cluster_uuid"` - } - callback := func(esClient *elasticsearch.Client) error { + var response struct { + ClusterUUID string `json:"cluster_uuid"` + } + status, body, err := esClient.Request("GET", "/", "", nil, nil) if err != nil { return errw.Wrap(err, "error querying /") From 27c7c5f9460eb6dc880ad2cd7080dafd7c13fa1f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 26 Mar 2019 08:42:50 -0700 Subject: [PATCH 49/59] Fixing scope of registry and var creations so they don't happen on every callback call --- libbeat/cmd/instance/beat.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 1fb99ec6fbf..ace2a71e982 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -815,6 +815,10 @@ func (b *Beat) registerClusterUUIDFetching() error { // Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, error) { + stateRegistry := monitoring.GetNamespace("state").GetRegistry() + elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch") + clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid") + callback := func(esClient *elasticsearch.Client) error { var response struct { ClusterUUID string `json:"cluster_uuid"` @@ -832,11 +836,7 @@ func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, err return fmt.Errorf("Error unmarshaling json when querying /. Body: %s", body) } - stateRegistry := monitoring.GetNamespace("state").GetRegistry() - outputsRegistry := stateRegistry.NewRegistry("outputs") - elasticsearchRegistry := outputsRegistry.NewRegistry("elasticsearch") - monitoring.NewString(elasticsearchRegistry, "cluster_uuid").Set(response.ClusterUUID) - + clusterUUIDRegVar.Set(response.ClusterUUID) return nil } From 3e58fe54a0b16d629ea26e1b42b35a4b0371f6dc Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 29 Mar 2019 09:16:26 -0700 Subject: [PATCH 50/59] Remove naming stutter --- libbeat/cmd/instance/beat.go | 2 +- libbeat/monitoring/monitoring.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index ace2a71e982..9660a3da79a 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -106,7 +106,7 @@ type beatConfig struct { Pipeline pipeline.Config `config:",inline"` // monitoring settings - monitoring.MonitoringBeatConfig `config:",inline"` + MonitoringBeatConfig monitoring.BeatConfig `config:",inline"` // central management settings Management *common.Config `config:"management"` diff --git a/libbeat/monitoring/monitoring.go b/libbeat/monitoring/monitoring.go index 4b444a7e607..9475c0ce787 100644 --- a/libbeat/monitoring/monitoring.go +++ b/libbeat/monitoring/monitoring.go @@ -25,8 +25,8 @@ import ( "github.com/elastic/beats/libbeat/monitoring/report" ) -// MonitoringBeatConfig represents the part of the $BEAT.yml to do with monitoring settings -type MonitoringBeatConfig struct { +// BeatConfig represents the part of the $BEAT.yml to do with monitoring settings +type BeatConfig struct { XPackMonitoring *common.Config `config:"xpack.monitoring"` Monitoring *common.Config `config:"monitoring"` } @@ -82,7 +82,7 @@ func Clear() error { // SelectConfig selects the appropriate monitoring configuration based on the user's settings in $BEAT.yml. Users may either // use xpack.monitoring.* settings OR monitoring.* settings but not both. -func SelectConfig(beatCfg MonitoringBeatConfig) (*common.Config, error) { +func SelectConfig(beatCfg BeatConfig) (*common.Config, error) { switch { case beatCfg.Monitoring.Enabled() && beatCfg.XPackMonitoring.Enabled(): errMonitoringBothConfigEnabled := errors.New("both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.elasticsearch.hosts to monitoring cluster hosts") From d8ffa49d9bd4e4cf209282b23b032c4eb1ce3bbf Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 29 Mar 2019 10:52:27 -0700 Subject: [PATCH 51/59] Reduce hackiness with passing reporter format --- libbeat/cmd/instance/beat.go | 3 ++- libbeat/monitoring/monitoring.go | 12 +++++------ .../report/elasticsearch/elasticsearch.go | 4 ++++ libbeat/monitoring/report/report.go | 20 +++++++------------ 4 files changed, 18 insertions(+), 21 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 9660a3da79a..a4ff28a6f9e 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -369,7 +369,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { return err } - monitoringCfg, err := monitoring.SelectConfig(b.Config.MonitoringBeatConfig) + monitoringCfg, reporterSettings, err := monitoring.SelectConfig(b.Config.MonitoringBeatConfig) if err != nil { return err } @@ -377,6 +377,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { if monitoringCfg.Enabled() { settings := report.Settings{ DefaultUsername: settings.Monitoring.DefaultUsername, + Format: reporterSettings.Format, } reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output) if err != nil { diff --git a/libbeat/monitoring/monitoring.go b/libbeat/monitoring/monitoring.go index 9475c0ce787..2fe9dffbc59 100644 --- a/libbeat/monitoring/monitoring.go +++ b/libbeat/monitoring/monitoring.go @@ -82,22 +82,20 @@ func Clear() error { // SelectConfig selects the appropriate monitoring configuration based on the user's settings in $BEAT.yml. Users may either // use xpack.monitoring.* settings OR monitoring.* settings but not both. -func SelectConfig(beatCfg BeatConfig) (*common.Config, error) { +func SelectConfig(beatCfg BeatConfig) (*common.Config, *report.Settings, error) { switch { case beatCfg.Monitoring.Enabled() && beatCfg.XPackMonitoring.Enabled(): errMonitoringBothConfigEnabled := errors.New("both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.elasticsearch.hosts to monitoring cluster hosts") - return nil, errMonitoringBothConfigEnabled + return nil, nil, errMonitoringBothConfigEnabled case beatCfg.XPackMonitoring.Enabled(): const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.elasticsearch.hosts to monitoring cluster hosts" cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) monitoringCfg := beatCfg.XPackMonitoring - monitoringCfg.SetInt("_format", -1, int64(report.ReportingFormatXPackMonitoringBulk)) - return monitoringCfg, nil + return monitoringCfg, &report.Settings{Format: report.ReportingFormatXPackMonitoringBulk}, nil case beatCfg.Monitoring.Enabled(): monitoringCfg := beatCfg.Monitoring - monitoringCfg.SetInt("_format", -1, int64(report.ReportingFormatBulk)) - return monitoringCfg, nil + return monitoringCfg, &report.Settings{Format: report.ReportingFormatBulk}, nil default: - return nil, nil + return nil, nil, nil } } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index e17053f7ce5..3cee586bdce 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -105,6 +105,10 @@ func defaultConfig(settings report.Settings) config { c.Username = settings.DefaultUsername } + if settings.Format != report.ReportingFormatUnknown { + c.Format = settings.Format + } + return c } diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index 0af15b45dab..353316a3a53 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -25,10 +25,11 @@ import ( "github.com/elastic/beats/libbeat/common" ) -// ReportingFormat encodes the type of format to report monitoring data in +// ReportingFormat encodes the type of format to report monitoring data in. type ReportingFormat int -// Enumerations of various ReportingFormats +// Enumerations of various ReportingFormats. A reporter can choose whether to +// interpret this setting or not, and if so, how to interpret it. const ( ReportingFormatUnknown ReportingFormat = iota // to protect against zero-value errors ReportingFormatXPackMonitoringBulk @@ -42,6 +43,7 @@ type config struct { type Settings struct { DefaultUsername string + Format ReportingFormat } type Reporter interface { @@ -69,7 +71,7 @@ func New( cfg *common.Config, outputs common.ConfigNamespace, ) (Reporter, error) { - name, cfg, err := getReporterConfig(cfg, outputs) + name, cfg, err := getReporterConfig(cfg, settings, outputs) if err != nil { return nil, err } @@ -84,6 +86,7 @@ func New( func getReporterConfig( monitoringConfig *common.Config, + settings Settings, outputs common.ConfigNamespace, ) (string, *common.Config, error) { cfg := collectSubObject(monitoringConfig) @@ -92,12 +95,6 @@ func getReporterConfig( return "", nil, err } - f, err := monitoringConfig.Int("_format", -1) - if err != nil { - return "", nil, err - } - format := ReportingFormat(f) - // load reporter from `monitoring` section and optionally // merge with output settings if config.Reporter.IsSet() { @@ -112,7 +109,7 @@ func getReporterConfig( }{} rc.Unpack(&hosts) - if format == ReportingFormatBulk && len(hosts.Hosts) > 0 { + if settings.Format == ReportingFormatBulk && len(hosts.Hosts) > 0 { pathMonHosts := rc.PathOf("hosts") pathOutHost := outCfg.PathOf("hosts") err := fmt.Errorf("'%v' and '%v' are configured", pathMonHosts, pathOutHost) @@ -126,7 +123,6 @@ func getReporterConfig( rc = merged } - rc.SetInt("_format", -1, int64(format)) return name, rc, nil } @@ -134,8 +130,6 @@ func getReporterConfig( if outputs.IsSet() { name := outputs.Name() if reportFactories[name] != nil { - outCfg := outputs.Config() - outCfg.SetInt("_format", -1, int64(format)) return name, outputs.Config(), nil } } From 35aa290a15049a984aac2e5ecaeca97b9dd03142 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Fri, 29 Mar 2019 11:25:27 -0700 Subject: [PATCH 52/59] Move error vars to package scope --- libbeat/monitoring/monitoring.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/libbeat/monitoring/monitoring.go b/libbeat/monitoring/monitoring.go index 2fe9dffbc59..19c949ada4f 100644 --- a/libbeat/monitoring/monitoring.go +++ b/libbeat/monitoring/monitoring.go @@ -42,6 +42,11 @@ const ( Full ) +var ( + errMonitoringBothConfigEnabled = errors.New("both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.elasticsearch.hosts to monitoring cluster hosts") + warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.elasticsearch.hosts to monitoring cluster hosts" +) + // Default is the global default metrics registry provided by the monitoring package. var Default = NewRegistry() @@ -85,10 +90,8 @@ func Clear() error { func SelectConfig(beatCfg BeatConfig) (*common.Config, *report.Settings, error) { switch { case beatCfg.Monitoring.Enabled() && beatCfg.XPackMonitoring.Enabled(): - errMonitoringBothConfigEnabled := errors.New("both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.elasticsearch.hosts to monitoring cluster hosts") return nil, nil, errMonitoringBothConfigEnabled case beatCfg.XPackMonitoring.Enabled(): - const warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.elasticsearch.hosts to monitoring cluster hosts" cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) monitoringCfg := beatCfg.XPackMonitoring return monitoringCfg, &report.Settings{Format: report.ReportingFormatXPackMonitoringBulk}, nil From ba6b23159ff5a6f92403b180c282836e35eea69e Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 2 Apr 2019 09:06:04 -0700 Subject: [PATCH 53/59] Removing stutter --- libbeat/monitoring/monitoring.go | 4 +-- .../monitoring/report/elasticsearch/client.go | 8 ++--- .../monitoring/report/elasticsearch/config.go | 32 +++++++++---------- .../report/elasticsearch/elasticsearch.go | 6 ++-- libbeat/monitoring/report/report.go | 16 +++++----- 5 files changed, 33 insertions(+), 33 deletions(-) diff --git a/libbeat/monitoring/monitoring.go b/libbeat/monitoring/monitoring.go index 19c949ada4f..6e19fa50044 100644 --- a/libbeat/monitoring/monitoring.go +++ b/libbeat/monitoring/monitoring.go @@ -94,10 +94,10 @@ func SelectConfig(beatCfg BeatConfig) (*common.Config, *report.Settings, error) case beatCfg.XPackMonitoring.Enabled(): cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) monitoringCfg := beatCfg.XPackMonitoring - return monitoringCfg, &report.Settings{Format: report.ReportingFormatXPackMonitoringBulk}, nil + return monitoringCfg, &report.Settings{Format: report.FormatXPackMonitoringBulk}, nil case beatCfg.Monitoring.Enabled(): monitoringCfg := beatCfg.Monitoring - return monitoringCfg, &report.Settings{Format: report.ReportingFormatBulk}, nil + return monitoringCfg, &report.Settings{Format: report.FormatBulk}, nil default: return nil, nil, nil } diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 1950cf12f3c..2a312856ec0 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -35,13 +35,13 @@ import ( type publishClient struct { es *esout.Client params map[string]string - format report.ReportingFormat + format report.Format } func newPublishClient( es *esout.Client, params map[string]string, - format report.ReportingFormat, + format report.Format, ) (*publishClient, error) { p := &publishClient{ es: es, @@ -131,9 +131,9 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } switch c.format { - case report.ReportingFormatXPackMonitoringBulk: + case report.FormatXPackMonitoringBulk: err = c.publishXPackBulk(params, event, typ) - case report.ReportingFormatBulk: + case report.FormatBulk: err = c.publishBulk(event, typ) } diff --git a/libbeat/monitoring/report/elasticsearch/config.go b/libbeat/monitoring/report/elasticsearch/config.go index 0a232817bea..104a78fa669 100644 --- a/libbeat/monitoring/report/elasticsearch/config.go +++ b/libbeat/monitoring/report/elasticsearch/config.go @@ -30,22 +30,22 @@ import ( type config struct { Hosts []string Protocol string - Params map[string]string `config:"parameters"` - Headers map[string]string `config:"headers"` - Username string `config:"username"` - Password string `config:"password"` - ProxyURL string `config:"proxy_url"` - CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` - TLS *tlscommon.Config `config:"ssl"` - MaxRetries int `config:"max_retries"` - Timeout time.Duration `config:"timeout"` - MetricsPeriod time.Duration `config:"metrics.period"` - StatePeriod time.Duration `config:"state.period"` - BulkMaxSize int `config:"bulk_max_size" validate:"min=0"` - BufferSize int `config:"buffer_size"` - Tags []string `config:"tags"` - Backoff backoff `config:"backoff"` - Format report.ReportingFormat `config:"_format"` + Params map[string]string `config:"parameters"` + Headers map[string]string `config:"headers"` + Username string `config:"username"` + Password string `config:"password"` + ProxyURL string `config:"proxy_url"` + CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` + TLS *tlscommon.Config `config:"ssl"` + MaxRetries int `config:"max_retries"` + Timeout time.Duration `config:"timeout"` + MetricsPeriod time.Duration `config:"metrics.period"` + StatePeriod time.Duration `config:"state.period"` + BulkMaxSize int `config:"bulk_max_size" validate:"min=0"` + BufferSize int `config:"buffer_size"` + Tags []string `config:"tags"` + Backoff backoff `config:"backoff"` + Format report.Format `config:"_format"` } type backoff struct { diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 3cee586bdce..56f95490fa7 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -98,14 +98,14 @@ func defaultConfig(settings report.Settings) config { Init: 1 * time.Second, Max: 60 * time.Second, }, - Format: report.ReportingFormatXPackMonitoringBulk, + Format: report.FormatXPackMonitoringBulk, } if settings.DefaultUsername != "" { c.Username = settings.DefaultUsername } - if settings.Format != report.ReportingFormatUnknown { + if settings.Format != report.FormatUnknown { c.Format = settings.Format } @@ -347,7 +347,7 @@ func makeClient( return nil, err } - if config.Format != report.ReportingFormatXPackMonitoringBulk && config.Format != report.ReportingFormatBulk { + if config.Format != report.FormatXPackMonitoringBulk && config.Format != report.FormatBulk { return nil, fmt.Errorf("unknown reporting format: %v", config.Format) } diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index 353316a3a53..42172bb0913 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -25,15 +25,15 @@ import ( "github.com/elastic/beats/libbeat/common" ) -// ReportingFormat encodes the type of format to report monitoring data in. -type ReportingFormat int +// Format encodes the type of format to report monitoring data in. +type Format int -// Enumerations of various ReportingFormats. A reporter can choose whether to +// Enumerations of various Formats. A reporter can choose whether to // interpret this setting or not, and if so, how to interpret it. const ( - ReportingFormatUnknown ReportingFormat = iota // to protect against zero-value errors - ReportingFormatXPackMonitoringBulk - ReportingFormatBulk + FormatUnknown Format = iota // to protect against zero-value errors + FormatXPackMonitoringBulk + FormatBulk ) type config struct { @@ -43,7 +43,7 @@ type config struct { type Settings struct { DefaultUsername string - Format ReportingFormat + Format Format } type Reporter interface { @@ -109,7 +109,7 @@ func getReporterConfig( }{} rc.Unpack(&hosts) - if settings.Format == ReportingFormatBulk && len(hosts.Hosts) > 0 { + if settings.Format == FormatBulk && len(hosts.Hosts) > 0 { pathMonHosts := rc.PathOf("hosts") pathOutHost := outCfg.PathOf("hosts") err := fmt.Errorf("'%v' and '%v' are configured", pathMonHosts, pathOutHost) From 1ce9ef8aeb52e031f40b9993e9e2e38603138edf Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 2 Apr 2019 09:07:01 -0700 Subject: [PATCH 54/59] Adding extra godoc about format usage --- libbeat/monitoring/report/report.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index 42172bb0913..e3e2cc6a43d 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -25,7 +25,8 @@ import ( "github.com/elastic/beats/libbeat/common" ) -// Format encodes the type of format to report monitoring data in. +// Format encodes the type of format to report monitoring data in. This +// is currently only being used by the elaticsearch reporter. type Format int // Enumerations of various Formats. A reporter can choose whether to From bf9dcb5c8d0a251b3b0a01cc16dabd85fa1352c1 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 2 Apr 2019 10:33:24 -0700 Subject: [PATCH 55/59] Extend godoc to explain the rationale behind the format hack --- libbeat/monitoring/report/report.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index e3e2cc6a43d..8af32bd084e 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -27,6 +27,11 @@ import ( // Format encodes the type of format to report monitoring data in. This // is currently only being used by the elaticsearch reporter. +// This is a hack that is necessary so we can map certain monitoring +// configuration options to certain behaviors in reporters. Depending on +// the configuration option used, the correct format is set, and reporters +// that know how to interpret the format use it to choose the appropriate +// reporting behavior. type Format int // Enumerations of various Formats. A reporter can choose whether to From 1c13224d70b623f20d14f9a6aa44c0a593334849 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 2 Apr 2019 17:21:30 -0700 Subject: [PATCH 56/59] Debugging docker container logs for system tests --- libbeat/scripts/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/scripts/Makefile b/libbeat/scripts/Makefile index a007eca7b29..017124a8214 100755 --- a/libbeat/scripts/Makefile +++ b/libbeat/scripts/Makefile @@ -221,7 +221,7 @@ system-tests: prepare-tests ${BEAT_NAME}.test python-env .PHONY: system-tests-environment system-tests-environment: ## @testing Runs the system tests inside a virtual environment. This can be run on any docker-machine (local, remote) system-tests-environment: prepare-tests build-image - ${DOCKER_COMPOSE} run -e INTEGRATION_TESTS=1 -e TESTING_ENVIRONMENT=${TESTING_ENVIRONMENT} -e DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} beat make system-tests + ${DOCKER_COMPOSE} run -e INTEGRATION_TESTS=1 -e TESTING_ENVIRONMENT=${TESTING_ENVIRONMENT} -e DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} beat make system-tests || ${DOCKER_COMPOSE} logs --tail 200 .PHONY: fast-system-tests fast-system-tests: ## @testing Runs system tests without coverage reports and in parallel From cd3afe53f14e9a970462eee7f42411381c11924f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 2 Apr 2019 22:50:09 -0700 Subject: [PATCH 57/59] Remove _type from bulk request URI --- libbeat/monitoring/report/elasticsearch/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 2a312856ec0..3169c9887e7 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -221,7 +221,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error { // Currently one request per event is sent. Reason is that each event can contain different // interval params and X-Pack requires to send the interval param. // FIXME: index name (first param below) - _, err = c.es.BulkWith(getMonitoringIndexName(), "doc", nil, nil, bulk[:]) + _, err = c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:]) return err } From 5943f2fb29916f9d3e36479fb71c20d352dfacc8 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 2 Apr 2019 23:22:16 -0700 Subject: [PATCH 58/59] Removing debugging statement --- libbeat/scripts/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/scripts/Makefile b/libbeat/scripts/Makefile index 017124a8214..a007eca7b29 100755 --- a/libbeat/scripts/Makefile +++ b/libbeat/scripts/Makefile @@ -221,7 +221,7 @@ system-tests: prepare-tests ${BEAT_NAME}.test python-env .PHONY: system-tests-environment system-tests-environment: ## @testing Runs the system tests inside a virtual environment. This can be run on any docker-machine (local, remote) system-tests-environment: prepare-tests build-image - ${DOCKER_COMPOSE} run -e INTEGRATION_TESTS=1 -e TESTING_ENVIRONMENT=${TESTING_ENVIRONMENT} -e DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} beat make system-tests || ${DOCKER_COMPOSE} logs --tail 200 + ${DOCKER_COMPOSE} run -e INTEGRATION_TESTS=1 -e TESTING_ENVIRONMENT=${TESTING_ENVIRONMENT} -e DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} beat make system-tests .PHONY: fast-system-tests fast-system-tests: ## @testing Runs system tests without coverage reports and in parallel From c64a59b11f9c5d9ed1d42a2cc4991734a3c26009 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 3 Apr 2019 05:17:19 -0700 Subject: [PATCH 59/59] Fixing logic --- libbeat/monitoring/report/report.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index 8af32bd084e..203371eee80 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -115,7 +115,7 @@ func getReporterConfig( }{} rc.Unpack(&hosts) - if settings.Format == FormatBulk && len(hosts.Hosts) > 0 { + if settings.Format == FormatXPackMonitoringBulk && len(hosts.Hosts) > 0 { pathMonHosts := rc.PathOf("hosts") pathOutHost := outCfg.PathOf("hosts") err := fmt.Errorf("'%v' and '%v' are configured", pathMonHosts, pathOutHost)