diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fe7064ed32b0..c1ad64017f8e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - New processor: `copy_fields`. {pull}11303[11303] - Add `error.message` to events when `fail_on_error` is set in `rename` and `copy_fields` processors. {pull}11303[11303] - New processor: `truncate_fields`. {pull}11297[11297] +- Allow a beat to ship monitoring data directly to an Elasticsearch monitoring clsuter. {pull}9260[9260] *Auditbeat* diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index f76f2047cf1e..a4ff28a6f9e3 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -39,6 +39,10 @@ import ( errw "github.com/pkg/errors" "go.uber.org/zap" + sysinfo "github.com/elastic/go-sysinfo" + "github.com/elastic/go-sysinfo/types" + ucfg "github.com/elastic/go-ucfg" + "github.com/elastic/beats/libbeat/api" "github.com/elastic/beats/libbeat/asset" "github.com/elastic/beats/libbeat/beat" @@ -66,9 +70,6 @@ import ( "github.com/elastic/beats/libbeat/publisher/processing" svc "github.com/elastic/beats/libbeat/service" "github.com/elastic/beats/libbeat/version" - sysinfo "github.com/elastic/go-sysinfo" - "github.com/elastic/go-sysinfo/types" - ucfg "github.com/elastic/go-ucfg" ) // Beat provides the runnable and configurable instance of a beat. @@ -102,8 +103,10 @@ type beatConfig struct { Keystore *common.Config `config:"keystore"` // output/publishing related configurations - Pipeline pipeline.Config `config:",inline"` - Monitoring *common.Config `config:"xpack.monitoring"` + Pipeline pipeline.Config `config:",inline"` + + // monitoring settings + MonitoringBeatConfig monitoring.BeatConfig `config:",inline"` // central management settings Management *common.Config `config:"management"` @@ -284,6 +287,11 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { return nil, err } + err = b.registerClusterUUIDFetching() + if err != nil { + return nil, err + } + reg := monitoring.Default.GetRegistry("libbeat") if reg == nil { reg = monitoring.Default.NewRegistry("libbeat") @@ -361,11 +369,17 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { return err } - if b.Config.Monitoring.Enabled() { + monitoringCfg, reporterSettings, err := monitoring.SelectConfig(b.Config.MonitoringBeatConfig) + if err != nil { + return err + } + + if monitoringCfg.Enabled() { settings := report.Settings{ DefaultUsername: settings.Monitoring.DefaultUsername, + Format: reporterSettings.Format, } - reporter, err := report.New(b.Info, settings, b.Config.Monitoring, b.Config.Output) + reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output) if err != nil { return err } @@ -759,8 +773,7 @@ func (b *Beat) registerESIndexManagement() error { return nil } -// Build and return a callback to load index template into ES -func (b *Beat) indexSetupCallback() func(esClient *elasticsearch.Client) error { +func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback { return func(esClient *elasticsearch.Client) error { m := b.index.Manager(esClient, idxmgmt.BeatsAssets(b.Fields)) return m.Setup(true, true) @@ -790,6 +803,47 @@ func (b *Beat) createOutput(stats outputs.Observer, cfg common.ConfigNamespace) return outputs.Load(b.index, b.Info, stats, cfg.Name(), cfg.Config()) } +func (b *Beat) registerClusterUUIDFetching() error { + if b.Config.Output.Name() == "elasticsearch" { + callback, err := b.clusterUUIDFetchingCallback() + if err != nil { + return err + } + elasticsearch.RegisterConnectCallback(callback) + } + return nil +} + +// Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring +func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, error) { + stateRegistry := monitoring.GetNamespace("state").GetRegistry() + elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch") + clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid") + + callback := func(esClient *elasticsearch.Client) error { + var response struct { + ClusterUUID string `json:"cluster_uuid"` + } + + status, body, err := esClient.Request("GET", "/", "", nil, nil) + if err != nil { + return errw.Wrap(err, "error querying /") + } + if status > 299 { + return fmt.Errorf("Error querying /. Status: %d. Response body: %s", status, body) + } + err = json.Unmarshal(body, &response) + if err != nil { + return fmt.Errorf("Error unmarshaling json when querying /. Body: %s", body) + } + + clusterUUIDRegVar.Set(response.ClusterUUID) + return nil + } + + return callback, nil +} + // handleError handles the given error by logging it and then returning the // error. If the err is nil or is a GracefulExit error then the method will // return nil without logging anything. diff --git a/libbeat/monitoring/monitoring.go b/libbeat/monitoring/monitoring.go index 943f4e79e1b8..6e19fa50044a 100644 --- a/libbeat/monitoring/monitoring.go +++ b/libbeat/monitoring/monitoring.go @@ -17,7 +17,19 @@ package monitoring -import "errors" +import ( + "errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/monitoring/report" +) + +// BeatConfig represents the part of the $BEAT.yml to do with monitoring settings +type BeatConfig struct { + XPackMonitoring *common.Config `config:"xpack.monitoring"` + Monitoring *common.Config `config:"monitoring"` +} type Mode uint8 @@ -30,6 +42,11 @@ const ( Full ) +var ( + errMonitoringBothConfigEnabled = errors.New("both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.elasticsearch.hosts to monitoring cluster hosts") + warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.elasticsearch.hosts to monitoring cluster hosts" +) + // Default is the global default metrics registry provided by the monitoring package. var Default = NewRegistry() @@ -67,3 +84,21 @@ func Remove(name string) { func Clear() error { return Default.Clear() } + +// SelectConfig selects the appropriate monitoring configuration based on the user's settings in $BEAT.yml. Users may either +// use xpack.monitoring.* settings OR monitoring.* settings but not both. +func SelectConfig(beatCfg BeatConfig) (*common.Config, *report.Settings, error) { + switch { + case beatCfg.Monitoring.Enabled() && beatCfg.XPackMonitoring.Enabled(): + return nil, nil, errMonitoringBothConfigEnabled + case beatCfg.XPackMonitoring.Enabled(): + cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig) + monitoringCfg := beatCfg.XPackMonitoring + return monitoringCfg, &report.Settings{Format: report.FormatXPackMonitoringBulk}, nil + case beatCfg.Monitoring.Enabled(): + monitoringCfg := beatCfg.Monitoring + return monitoringCfg, &report.Settings{Format: report.FormatBulk}, nil + default: + return nil, nil, nil + } +} diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 63be94cf324a..3169c9887e79 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -20,6 +20,7 @@ package elasticsearch import ( "encoding/json" "fmt" + "time" "github.com/pkg/errors" @@ -34,17 +35,20 @@ import ( type publishClient struct { es *esout.Client params map[string]string + format report.Format } func newPublishClient( es *esout.Client, params map[string]string, -) *publishClient { + format report.Format, +) (*publishClient, error) { p := &publishClient{ es: es, params: params, + format: format, } - return p + return p, nil } func (c *publishClient) Connect() error { @@ -84,6 +88,7 @@ func (c *publishClient) Connect() error { } debugf("XPack monitoring is enabled") + return nil } @@ -97,13 +102,18 @@ func (c *publishClient) Publish(batch publisher.Batch) error { var reason error for _, event := range events { - // Extract time + // Extract type t, err := event.Content.Meta.GetValue("type") if err != nil { logp.Err("Type not available in monitoring reported. Please report this error: %s", err) continue } + typ, ok := t.(string) + if !ok { + logp.Err("monitoring type is not a string") + } + var params = map[string]string{} // Copy params for k, v := range c.params { @@ -120,23 +130,13 @@ func (c *publishClient) Publish(batch publisher.Batch) error { } } - meta := common.MapStr{ - "_index": "", - "_routing": nil, - "_type": t, - } - bulk := [2]interface{}{ - common.MapStr{"index": meta}, - report.Event{ - Timestamp: event.Content.Timestamp, - Fields: event.Content.Fields, - }, + switch c.format { + case report.FormatXPackMonitoringBulk: + err = c.publishXPackBulk(params, event, typ) + case report.FormatBulk: + err = c.publishBulk(event, typ) } - // Currently one request per event is sent. Reason is that each event can contain different - // interval params and X-Pack requires to send the interval param. - _, err = c.es.SendMonitoringBulk(params, bulk[:]) - if err != nil { failed = append(failed, event) reason = err @@ -158,3 +158,75 @@ func (c *publishClient) Test(d testing.Driver) { func (c *publishClient) String() string { return "publish(" + c.es.String() + ")" } + +func (c *publishClient) publishXPackBulk(params map[string]string, event publisher.Event, typ string) error { + meta := common.MapStr{ + "_index": "", + "_routing": nil, + "_type": typ, + } + bulk := [2]interface{}{ + common.MapStr{"index": meta}, + report.Event{ + Timestamp: event.Content.Timestamp, + Fields: event.Content.Fields, + }, + } + + // Currently one request per event is sent. Reason is that each event can contain different + // interval params and X-Pack requires to send the interval param. + _, err := c.es.SendMonitoringBulk(params, bulk[:]) + return err +} + +func (c *publishClient) publishBulk(event publisher.Event, typ string) error { + meta := common.MapStr{ + "_index": getMonitoringIndexName(), + "_routing": nil, + } + + if c.es.GetVersion().Major < 7 { + meta["_type"] = "doc" + } + + action := common.MapStr{ + "index": meta, + } + + event.Content.Fields.Put("timestamp", event.Content.Timestamp) + + fields := common.MapStr{ + "type": typ, + typ: event.Content.Fields, + } + + interval, err := event.Content.Meta.GetValue("interval_ms") + if err != nil { + return errors.Wrap(err, "could not determine interval_ms field") + } + fields.Put("interval_ms", interval) + + clusterUUID, err := event.Content.Meta.GetValue("cluster_uuid") + if err != nil && err != common.ErrKeyNotFound { + return errors.Wrap(err, "could not determine cluster_uuid field") + } + fields.Put("cluster_uuid", clusterUUID) + + document := report.Event{ + Timestamp: event.Content.Timestamp, + Fields: fields, + } + bulk := [2]interface{}{action, document} + + // Currently one request per event is sent. Reason is that each event can contain different + // interval params and X-Pack requires to send the interval param. + // FIXME: index name (first param below) + _, err = c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:]) + return err +} + +func getMonitoringIndexName() string { + version := 7 + date := time.Now().Format("2006.01.02") + return fmt.Sprintf(".monitoring-beats-%v-%s", version, date) +} diff --git a/libbeat/monitoring/report/elasticsearch/config.go b/libbeat/monitoring/report/elasticsearch/config.go index 995f03bbc305..104a78fa6696 100644 --- a/libbeat/monitoring/report/elasticsearch/config.go +++ b/libbeat/monitoring/report/elasticsearch/config.go @@ -20,6 +20,8 @@ package elasticsearch import ( "time" + "github.com/elastic/beats/libbeat/monitoring/report" + "github.com/elastic/beats/libbeat/common/transport/tlscommon" ) @@ -43,6 +45,7 @@ type config struct { BufferSize int `config:"buffer_size"` Tags []string `config:"tags"` Backoff backoff `config:"backoff"` + Format report.Format `config:"_format"` } type backoff struct { diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 59d4406a7bf0..56f95490fa7b 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -19,6 +19,7 @@ package elasticsearch import ( "errors" + "fmt" "io" "math/rand" "net/url" @@ -97,12 +98,17 @@ func defaultConfig(settings report.Settings) config { Init: 1 * time.Second, Max: 60 * time.Second, }, + Format: report.FormatXPackMonitoringBulk, } if settings.DefaultUsername != "" { c.Username = settings.DefaultUsername } + if settings.Format != report.FormatUnknown { + c.Format = settings.Format + } + return c } @@ -291,15 +297,23 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration) if len(r.tags) > 0 { fields["tags"] = r.tags } + + meta := common.MapStr{ + "type": "beats_" + namespace, + "interval_ms": int64(period / time.Millisecond), + // Converting to seconds as interval only accepts `s` as unit + "params": map[string]string{"interval": strconv.Itoa(int(period/time.Second)) + "s"}, + } + + clusterUUID := getClusterUUID() + if clusterUUID != "" { + meta.Put("cluster_uuid", clusterUUID) + } + r.client.Publish(beat.Event{ Timestamp: ts, Fields: fields, - Meta: common.MapStr{ - "type": "beats_" + namespace, - "interval_ms": int64(period / time.Millisecond), - // Converting to seconds as interval only accepts `s` as unit - "params": map[string]string{"interval": strconv.Itoa(int(period/time.Second)) + "s"}, - }, + Meta: meta, }) } } @@ -333,7 +347,11 @@ func makeClient( return nil, err } - return newPublishClient(esClient, params), nil + if config.Format != report.FormatXPackMonitoringBulk && config.Format != report.FormatBulk { + return nil, fmt.Errorf("unknown reporting format: %v", config.Format) + } + + return newPublishClient(esClient, params, config.Format) } func closing(log *logp.Logger, c io.Closer) { @@ -367,3 +385,19 @@ func makeMeta(beat beat.Info) common.MapStr { "uuid": beat.ID, } } + +func getClusterUUID() string { + stateRegistry := monitoring.GetNamespace("state").GetRegistry() + outputsRegistry := stateRegistry.GetRegistry("outputs") + if outputsRegistry == nil { + return "" + } + + elasticsearchRegistry := outputsRegistry.GetRegistry("elasticsearch") + if elasticsearchRegistry == nil { + return "" + } + + snapshot := monitoring.CollectFlatSnapshot(elasticsearchRegistry, monitoring.Full, false) + return snapshot.Strings["cluster_uuid"] +} diff --git a/libbeat/monitoring/report/report.go b/libbeat/monitoring/report/report.go index 8e180de6aadb..203371eee80d 100644 --- a/libbeat/monitoring/report/report.go +++ b/libbeat/monitoring/report/report.go @@ -25,6 +25,23 @@ import ( "github.com/elastic/beats/libbeat/common" ) +// Format encodes the type of format to report monitoring data in. This +// is currently only being used by the elaticsearch reporter. +// This is a hack that is necessary so we can map certain monitoring +// configuration options to certain behaviors in reporters. Depending on +// the configuration option used, the correct format is set, and reporters +// that know how to interpret the format use it to choose the appropriate +// reporting behavior. +type Format int + +// Enumerations of various Formats. A reporter can choose whether to +// interpret this setting or not, and if so, how to interpret it. +const ( + FormatUnknown Format = iota // to protect against zero-value errors + FormatXPackMonitoringBulk + FormatBulk +) + type config struct { // allow for maximum one reporter being configured Reporter common.ConfigNamespace `config:",inline"` @@ -32,6 +49,7 @@ type config struct { type Settings struct { DefaultUsername string + Format Format } type Reporter interface { @@ -59,7 +77,7 @@ func New( cfg *common.Config, outputs common.ConfigNamespace, ) (Reporter, error) { - name, cfg, err := getReporterConfig(cfg, outputs) + name, cfg, err := getReporterConfig(cfg, settings, outputs) if err != nil { return nil, err } @@ -73,10 +91,11 @@ func New( } func getReporterConfig( - cfg *common.Config, + monitoringConfig *common.Config, + settings Settings, outputs common.ConfigNamespace, ) (string, *common.Config, error) { - cfg = collectSubObject(cfg) + cfg := collectSubObject(monitoringConfig) config := defaultConfig if err := cfg.Unpack(&config); err != nil { return "", nil, err @@ -96,7 +115,7 @@ func getReporterConfig( }{} rc.Unpack(&hosts) - if len(hosts.Hosts) > 0 { + if settings.Format == FormatXPackMonitoringBulk && len(hosts.Hosts) > 0 { pathMonHosts := rc.PathOf("hosts") pathOutHost := outCfg.PathOf("hosts") err := fmt.Errorf("'%v' and '%v' are configured", pathMonHosts, pathOutHost) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 42f9e69939b1..03f8dca7e83f 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -77,7 +77,8 @@ type ClientSettings struct { Observer outputs.Observer } -type connectCallback func(client *Client) error +// ConnectCallback defines the type for the function to be called when the Elasticsearch client successfully connects to the cluster +type ConnectCallback func(client *Client) error // Connection manages the connection for a given client. type Connection struct { diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 323f4c88a35d..0602ea4e7ca0 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -54,7 +54,7 @@ var ( // Callbacks must not depend on the result of a previous one, // because the ordering is not fixed. type callbacksRegistry struct { - callbacks map[uuid.UUID]connectCallback + callbacks map[uuid.UUID]ConnectCallback mutex sync.Mutex } @@ -66,7 +66,7 @@ var connectCallbackRegistry = newCallbacksRegistry() var globalCallbackRegistry = newCallbacksRegistry() // RegisterGlobalCallback register a global callbacks. -func RegisterGlobalCallback(callback connectCallback) (uuid.UUID, error) { +func RegisterGlobalCallback(callback ConnectCallback) (uuid.UUID, error) { globalCallbackRegistry.mutex.Lock() defer globalCallbackRegistry.mutex.Unlock() @@ -88,14 +88,14 @@ func RegisterGlobalCallback(callback connectCallback) (uuid.UUID, error) { func newCallbacksRegistry() callbacksRegistry { return callbacksRegistry{ - callbacks: make(map[uuid.UUID]connectCallback), + callbacks: make(map[uuid.UUID]ConnectCallback), } } // RegisterConnectCallback registers a callback for the elasticsearch output // The callback is called each time the client connects to elasticsearch. // It returns the key of the newly added callback, so it can be deregistered later. -func RegisterConnectCallback(callback connectCallback) (uuid.UUID, error) { +func RegisterConnectCallback(callback ConnectCallback) (uuid.UUID, error) { connectCallbackRegistry.mutex.Lock() defer connectCallbackRegistry.mutex.Unlock() diff --git a/libbeat/tests/system/config/mockbeat.yml.j2 b/libbeat/tests/system/config/mockbeat.yml.j2 index d714bd74d844..55f2ec97d5f2 100644 --- a/libbeat/tests/system/config/mockbeat.yml.j2 +++ b/libbeat/tests/system/config/mockbeat.yml.j2 @@ -109,5 +109,12 @@ xpack.monitoring.elasticsearch.metrics.period: 2s # to speed up tests xpack.monitoring.elasticsearch.state.period: 3s # to speed up tests {% endif -%} +{% if monitoring -%} +#================================ X-Pack Monitoring (direct) ===================================== +monitoring.elasticsearch.hosts: {{monitoring.elasticsearch.hosts}} +monitoring.elasticsearch.metrics.period: 2s # to speed up tests +monitoring.elasticsearch.state.period: 3s # to speed up tests +{% endif -%} + # vim: set ft=jinja: diff --git a/libbeat/tests/system/test_monitoring.py b/libbeat/tests/system/test_monitoring.py index 17ee66dbd5fb..1071cb5065d0 100644 --- a/libbeat/tests/system/test_monitoring.py +++ b/libbeat/tests/system/test_monitoring.py @@ -35,7 +35,9 @@ def test_via_output_cluster(self): } ) - self.clean() + self.clean_output_cluster() + self.clean_monitoring_cluster() + self.init_output_cluster() proc = self.start_beat(config="mockbeat.yml") self.wait_until(lambda: self.log_contains("mockbeat start running.")) @@ -45,19 +47,127 @@ def test_via_output_cluster(self): self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + proc.check_kill_and_wait() + for monitoring_doc_type in ['beats_stats', 'beats_state']: field_names = ['cluster_uuid', 'timestamp', 'interval_ms', 'type', 'source_node', monitoring_doc_type] self.assert_monitoring_doc_contains_fields(monitoring_doc_type, field_names) - def monitoring_doc_exists(self, monitoring_type): + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_direct_to_monitoring_cluster(self): + """ + Test shipping monitoring data directly to the monitoring cluster. + Make sure expected documents are indexed in monitoring cluster. + """ + + self.render_config_template( + "mockbeat", + monitoring={ + "elasticsearch": { + "hosts": [self.get_elasticsearch_monitoring_url()] + } + } + ) + + self.clean_output_cluster() + self.clean_monitoring_cluster() + + proc = self.start_beat(config="mockbeat.yml") + self.wait_until(lambda: self.log_contains("mockbeat start running.")) + self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) + self.wait_until(lambda: self.log_contains(re.compile( + "Connection to .*elasticsearch\("+self.get_elasticsearch_monitoring_url()+"\).* established"))) + self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) + self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + + proc.check_kill_and_wait() + + for monitoring_doc_type in ['beats_stats', 'beats_state']: + field_names = ['cluster_uuid', 'timestamp', 'interval_ms', 'type', monitoring_doc_type] + self.assert_monitoring_doc_contains_fields(monitoring_doc_type, field_names) + + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + @attr('integration') + def test_compare(self): + """ + Test that monitoring docs are the same, regardless of how they are shipped. + """ + + self.render_config_template( + "mockbeat", + xpack={ + "monitoring": { + "elasticsearch": { + "hosts": [self.get_elasticsearch_url()] + } + } + } + ) + + self.clean_output_cluster() + self.clean_monitoring_cluster() + self.init_output_cluster() + + proc = self.start_beat(config="mockbeat.yml") + self.wait_until(lambda: self.log_contains("mockbeat start running.")) + self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) + self.wait_until(lambda: self.log_contains(re.compile( + "Connection to .*elasticsearch\("+self.get_elasticsearch_url()+"\).* established"))) + self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) + self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + + proc.check_kill_and_wait() + + indirect_beats_stats_doc = self.get_monitoring_doc('beats_stats') + indirect_beats_state_doc = self.get_monitoring_doc('beats_state') + + self.render_config_template( + "mockbeat", + monitoring={ + "elasticsearch": { + "hosts": [self.get_elasticsearch_monitoring_url()] + } + } + ) + + self.clean_output_cluster() + self.clean_monitoring_cluster() + + proc = self.start_beat(config="mockbeat.yml") + self.wait_until(lambda: self.log_contains("mockbeat start running.")) + self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event"))) + self.wait_until(lambda: self.log_contains(re.compile( + "Connection to .*elasticsearch\("+self.get_elasticsearch_monitoring_url()+"\).* established"))) + self.wait_until(lambda: self.monitoring_doc_exists('beats_stats')) + self.wait_until(lambda: self.monitoring_doc_exists('beats_state')) + + proc.check_kill_and_wait() + + direct_beats_stats_doc = self.get_monitoring_doc('beats_stats') + direct_beats_state_doc = self.get_monitoring_doc('beats_state') + + self.assert_same_structure(indirect_beats_state_doc['beats_state'], direct_beats_state_doc['beats_state']) + self.assert_same_structure(indirect_beats_stats_doc['beats_stats'], direct_beats_stats_doc['beats_stats']) + + def search_monitoring_doc(self, monitoring_type): results = self.es_monitoring.search( index='.monitoring-beats-*', q='type:'+monitoring_type, size=1 ) - hits = results['hits']['hits'] + return results['hits']['hits'] + + def monitoring_doc_exists(self, monitoring_type): + hits = self.search_monitoring_doc(monitoring_type) return len(hits) == 1 + def get_monitoring_doc(self, monitoring_type): + hits = self.search_monitoring_doc(monitoring_type) + if len(hits) != 1: + return None + return hits[0]['_source'] + def assert_monitoring_doc_contains_fields(self, monitoring_type, field_names): results = self.es_monitoring.search( index='.monitoring-beats-*', @@ -68,9 +178,40 @@ def assert_monitoring_doc_contains_fields(self, monitoring_type, field_names): source = hits[0]['_source'] for field_name in field_names: - assert field_name in source + self.assertIn(field_name, source) + + def assert_same_structure(self, dict1, dict2): + dict1_keys = dict1.keys() + dict2_keys = dict2.keys() + + self.assertEqual(len(dict1_keys), len(dict2_keys)) + for key in dict1_keys: + dict1_val = dict1[key] + dict2_val = dict2[key] + self.assertEqual(type(dict1_val), type(dict2_val)) + if type(dict1_val) is dict: + self.assert_same_structure(dict1_val, dict2_val) + + def clean_output_cluster(self): + # Remove all exporters + self.es.cluster.put_settings(body={ + "transient": { + "xpack.monitoring.exporters.*": None + } + }) - def clean(self): + # Disable collection + self.es.cluster.put_settings(body={ + "transient": { + "xpack.monitoring.collection.enabled": None + } + }) + + def clean_monitoring_cluster(self): + # Delete any old beats monitoring data + self.es_monitoring.indices.delete(index=".monitoring-beats-*", ignore=[404]) + + def init_output_cluster(self): # Setup remote exporter self.es.cluster.put_settings(body={ "transient": { @@ -88,9 +229,6 @@ def clean(self): } }) - # Delete any old beats monitoring data - self.es_monitoring.indices.delete(index=".monitoring-beats-*", ignore=[404]) - def get_elasticsearch_monitoring_url(self): return "http://{host}:{port}".format( host=os.getenv("ES_MONITORING_HOST", "localhost"),