diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ddf027734643..e4a067b58474 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -212,6 +212,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923] - Add collecting AuroraDB metrics in rds metricset. {issue}14142[14142] {pull}16004[16004] - Reuse connections in SQL module. {pull}16001[16001] +- Improve the `logstash` module (when `xpack.enabled` is set to `true`) to use the override `cluster_uuid` returned by Logstash APIs. {issue}15772[15772] {pull}15795[15795] *Packetbeat* diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index 5f221b021775..d53933421243 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -134,20 +134,24 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { }, nil } -// GetPipelines returns the list of pipelines running on a Logstash node -func GetPipelines(m *MetricSet) ([]PipelineState, error) { +// GetPipelines returns the list of pipelines running on a Logstash node and, +// optionally, an override cluster UUID. +func GetPipelines(m *MetricSet) ([]PipelineState, string, error) { content, err := fetchPath(m.HTTP, "_node/pipelines", "graph=true") if err != nil { - return nil, errors.Wrap(err, "could not fetch node pipelines") + return nil, "", errors.Wrap(err, "could not fetch node pipelines") } pipelinesResponse := struct { + Monitoring struct { + ClusterID string `json:"cluster_uuid"` + } `json:"monitoring"` Pipelines map[string]PipelineState `json:"pipelines"` }{} err = json.Unmarshal(content, &pipelinesResponse) if err != nil { - return nil, errors.Wrap(err, "could not parse node pipelines response") + return nil, "", errors.Wrap(err, "could not parse node pipelines response") } var pipelines []PipelineState @@ -156,7 +160,7 @@ func GetPipelines(m *MetricSet) ([]PipelineState, error) { pipelines = append(pipelines, pipeline) } - return pipelines, nil + return pipelines, pipelinesResponse.Monitoring.ClusterID, nil } // CheckPipelineGraphAPIsAvailable returns an error if pipeline graph APIs are not @@ -177,6 +181,27 @@ func (m *MetricSet) CheckPipelineGraphAPIsAvailable() error { return nil } +// GetVertexClusterUUID returns the correct cluster UUID value for the given Elasticsearch +// vertex from a Logstash pipeline. If the vertex has no cluster UUID associated with it, +// the given override cluster UUID is returned. +func GetVertexClusterUUID(vertex map[string]interface{}, overrideClusterUUID string) string { + c, ok := vertex["cluster_uuid"] + if !ok { + return overrideClusterUUID + } + + clusterUUID, ok := c.(string) + if !ok { + return overrideClusterUUID + } + + if clusterUUID == "" { + return overrideClusterUUID + } + + return clusterUUID +} + func (m *MetricSet) getVersion() (*common.Version, error) { const rootPath = "/" content, err := fetchPath(m.HTTP, rootPath, "") diff --git a/metricbeat/module/logstash/logstash_test.go b/metricbeat/module/logstash/logstash_test.go new file mode 100644 index 000000000000..73ec193eed66 --- /dev/null +++ b/metricbeat/module/logstash/logstash_test.go @@ -0,0 +1,59 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logstash + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetVertexClusterUUID(t *testing.T) { + tests := map[string]struct { + vertex map[string]interface{} + overrideClusterUUID string + expectedClusterUUID string + }{ + "vertex_and_override": { + map[string]interface{}{ + "cluster_uuid": "v", + }, + "o", + "v", + }, + "vertex_only": { + vertex: map[string]interface{}{ + "cluster_uuid": "v", + }, + expectedClusterUUID: "v", + }, + "override_only": { + overrideClusterUUID: "o", + expectedClusterUUID: "o", + }, + "none": { + expectedClusterUUID: "", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.Equal(t, test.expectedClusterUUID, GetVertexClusterUUID(test.vertex, test.overrideClusterUUID)) + }) + } +} diff --git a/metricbeat/module/logstash/node/data_xpack.go b/metricbeat/module/logstash/node/data_xpack.go index 26634e3ed37d..c076cf3a41ae 100644 --- a/metricbeat/module/logstash/node/data_xpack.go +++ b/metricbeat/module/logstash/node/data_xpack.go @@ -26,9 +26,9 @@ import ( "github.com/elastic/beats/metricbeat/module/logstash" ) -func eventMappingXPack(r mb.ReporterV2, m *MetricSet, pipelines []logstash.PipelineState) error { +func eventMappingXPack(r mb.ReporterV2, m *MetricSet, pipelines []logstash.PipelineState, overrideClusterUUID string) error { pipelines = getUserDefinedPipelines(pipelines) - clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines) + clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines, overrideClusterUUID) for clusterUUID, pipelines := range clusterToPipelinesMap { for _, pipeline := range pipelines { removeClusterUUIDsFromPipeline(pipeline) @@ -62,24 +62,17 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, pipelines []logstash.Pipel return nil } -func makeClusterToPipelinesMap(pipelines []logstash.PipelineState) map[string][]logstash.PipelineState { +func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClusterUUID string) map[string][]logstash.PipelineState { var clusterToPipelinesMap map[string][]logstash.PipelineState clusterToPipelinesMap = make(map[string][]logstash.PipelineState) for _, pipeline := range pipelines { var clusterUUIDs []string for _, vertex := range pipeline.Graph.Graph.Vertices { - c, ok := vertex["cluster_uuid"] - if !ok { - continue - } - - clusterUUID, ok := c.(string) - if !ok { - continue + clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID) + if clusterUUID != "" { + clusterUUIDs = append(clusterUUIDs, clusterUUID) } - - clusterUUIDs = append(clusterUUIDs, clusterUUID) } // If no cluster UUID was found in this pipeline, assign it a blank one diff --git a/metricbeat/module/logstash/node/node.go b/metricbeat/module/logstash/node/node.go index 75b4a3bc4f18..e076e990ac24 100644 --- a/metricbeat/module/logstash/node/node.go +++ b/metricbeat/module/logstash/node/node.go @@ -74,13 +74,13 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return eventMapping(r, content) } - pipelinesContent, err := logstash.GetPipelines(m.MetricSet) + pipelinesContent, overrideClusterUUID, err := logstash.GetPipelines(m.MetricSet) if err != nil { m.Logger().Error(err) return nil } - err = eventMappingXPack(r, m, pipelinesContent) + err = eventMappingXPack(r, m, pipelinesContent, overrideClusterUUID) if err != nil { m.Logger().Error(err) } diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index ba398ac11f2f..4d9e071f4009 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -21,6 +21,8 @@ import ( "encoding/json" "time" + "github.com/elastic/beats/metricbeat/module/logstash" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" @@ -94,6 +96,9 @@ type nodeInfo struct { Status string `json:"status"` HTTPAddress string `json:"http_address"` Pipeline pipeline `json:"pipeline"` + Monitoring struct { + ClusterID string `json:"cluster_uuid"` + } `json:"monitoring"` } type reloads struct { @@ -166,7 +171,7 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { } pipelines = getUserDefinedPipelines(pipelines) - clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines) + clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines, nodeStats.Monitoring.ClusterID) for clusterUUID, clusterPipelines := range clusterToPipelinesMap { logstashStats := LogstashStats{ @@ -197,24 +202,22 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { return nil } -func makeClusterToPipelinesMap(pipelines []PipelineStats) map[string][]PipelineStats { +func makeClusterToPipelinesMap(pipelines []PipelineStats, overrideClusterUUID string) map[string][]PipelineStats { var clusterToPipelinesMap map[string][]PipelineStats clusterToPipelinesMap = make(map[string][]PipelineStats) + if overrideClusterUUID != "" { + clusterToPipelinesMap[overrideClusterUUID] = pipelines + return clusterToPipelinesMap + } + for _, pipeline := range pipelines { var clusterUUIDs []string for _, vertex := range pipeline.Vertices { - c, ok := vertex["cluster_uuid"] - if !ok { - continue + clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID) + if clusterUUID != "" { + clusterUUIDs = append(clusterUUIDs, clusterUUID) } - - clusterUUID, ok := c.(string) - if !ok { - continue - } - - clusterUUIDs = append(clusterUUIDs, clusterUUID) } // If no cluster UUID was found in this pipeline, assign it a blank one