diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d34b883a6e2c..b8a0f00cb457 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -326,6 +326,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix failiures caused by custom beat names with more than 15 characters {pull}22550[22550] - Stop generating NaN values from Cloud Foundry module to avoid errors in outputs. {pull}22634[22634] - Update NATS dashboards to leverage connection and route metricsets {pull}22646[22646] +- Fix `logstash` module when `xpack.enabled: true` is set from emitting redundant events. {pull}22808[22808] *Packetbeat* diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index fbc030fa3103..abd737f3ed4d 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -57,13 +57,13 @@ type MetricSet struct { XPack bool } -type graph struct { +type Graph struct { Vertices []map[string]interface{} `json:"vertices"` Edges []map[string]interface{} `json:"edges"` } -type graphContainer struct { - Graph *graph `json:"graph,omitempty"` +type GraphContainer struct { + Graph *Graph `json:"graph,omitempty"` Type string `json:"type"` Version string `json:"version"` Hash string `json:"hash"` @@ -74,8 +74,8 @@ type PipelineState struct { ID string `json:"id"` Hash string `json:"hash"` EphemeralID string `json:"ephemeral_id"` - Graph *graphContainer `json:"graph,omitempty"` - Representation *graphContainer `json:"representation"` + Graph *GraphContainer `json:"graph,omitempty"` + Representation *GraphContainer `json:"representation"` BatchSize int `json:"batch_size"` Workers int `json:"workers"` } diff --git a/metricbeat/module/logstash/node/data_xpack.go b/metricbeat/module/logstash/node/data_xpack.go index 96fc252158c4..66d3623c7de6 100644 --- a/metricbeat/module/logstash/node/data_xpack.go +++ b/metricbeat/module/logstash/node/data_xpack.go @@ -66,21 +66,26 @@ func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClust var clusterToPipelinesMap map[string][]logstash.PipelineState clusterToPipelinesMap = make(map[string][]logstash.PipelineState) + if overrideClusterUUID != "" { + clusterToPipelinesMap[overrideClusterUUID] = pipelines + return clusterToPipelinesMap + } + for _, pipeline := range pipelines { - var clusterUUIDs []string + clusterUUIDs := common.StringSet{} for _, vertex := range pipeline.Graph.Graph.Vertices { clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID) if clusterUUID != "" { - clusterUUIDs = append(clusterUUIDs, clusterUUID) + clusterUUIDs.Add(clusterUUID) } } // If no cluster UUID was found in this pipeline, assign it a blank one if len(clusterUUIDs) == 0 { - clusterUUIDs = []string{""} + clusterUUIDs.Add("") } - for _, clusterUUID := range clusterUUIDs { + for clusterUUID := range clusterUUIDs { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { clusterToPipelinesMap[clusterUUID] = []logstash.PipelineState{} diff --git a/metricbeat/module/logstash/node/data_xpack_test.go b/metricbeat/module/logstash/node/data_xpack_test.go new file mode 100644 index 000000000000..17ae0aaaf912 --- /dev/null +++ b/metricbeat/module/logstash/node/data_xpack_test.go @@ -0,0 +1,328 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package node + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/metricbeat/module/logstash" +) + +func TestMakeClusterToPipelinesMap(t *testing.T) { + tests := map[string]struct { + pipelines []logstash.PipelineState + overrideClusterUUID string + expectedMap map[string][]logstash.PipelineState + }{ + "no_vertex_cluster_id": { + pipelines: []logstash.PipelineState{ + { + ID: "test_pipeline", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]logstash.PipelineState{ + "prod_cluster_id": { + { + ID: "test_pipeline", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + }, + }, + "one_vertex_cluster_id": { + pipelines: []logstash.PipelineState{ + { + ID: "test_pipeline", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]logstash.PipelineState{ + "prod_cluster_id": { + { + ID: "test_pipeline", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + }, + }, + "two_pipelines": { + pipelines: []logstash.PipelineState{ + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + { + ID: "test_pipeline_2", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]logstash.PipelineState{ + "prod_cluster_id": { + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + { + ID: "test_pipeline_2", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + }, + }, + }, + }, + "no_override_cluster_id": { + pipelines: []logstash.PipelineState{ + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + { + ID: "test_pipeline_2", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + }, + }, + overrideClusterUUID: "", + expectedMap: map[string][]logstash.PipelineState{ + "es_1": { + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + }, + "es_2": { + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + }, + "": { + { + ID: "test_pipeline_2", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + }, + }, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actualMap := makeClusterToPipelinesMap(test.pipelines, test.overrideClusterUUID) + require.Equal(t, test.expectedMap, actualMap) + }) + } +} diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index a6a9867b7cde..e5d82365b534 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -219,20 +219,20 @@ func makeClusterToPipelinesMap(pipelines []PipelineStats, overrideClusterUUID st } for _, pipeline := range pipelines { - var clusterUUIDs []string + clusterUUIDs := common.StringSet{} for _, vertex := range pipeline.Vertices { clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID) if clusterUUID != "" { - clusterUUIDs = append(clusterUUIDs, clusterUUID) + clusterUUIDs.Add(clusterUUID) } } // If no cluster UUID was found in this pipeline, assign it a blank one if len(clusterUUIDs) == 0 { - clusterUUIDs = []string{""} + clusterUUIDs.Add("") } - for _, clusterUUID := range clusterUUIDs { + for clusterUUID := range clusterUUIDs { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { clusterToPipelinesMap[clusterUUID] = []PipelineStats{} diff --git a/metricbeat/module/logstash/node_stats/data_xpack_test.go b/metricbeat/module/logstash/node_stats/data_xpack_test.go new file mode 100644 index 000000000000..6593be725347 --- /dev/null +++ b/metricbeat/module/logstash/node_stats/data_xpack_test.go @@ -0,0 +1,273 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package node_stats + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMakeClusterToPipelinesMap(t *testing.T) { + tests := map[string]struct { + pipelines []PipelineStats + overrideClusterUUID string + expectedMap map[string][]PipelineStats + }{ + "no_vertex_cluster_id": { + pipelines: []PipelineStats{ + { + ID: "test_pipeline", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]PipelineStats{ + "prod_cluster_id": { + { + ID: "test_pipeline", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + "one_vertex_cluster_id": { + pipelines: []PipelineStats{ + { + ID: "test_pipeline", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]PipelineStats{ + "prod_cluster_id": { + { + ID: "test_pipeline", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + "two_pipelines": { + pipelines: []PipelineStats{ + { + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + { + ID: "test_pipeline_2", + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]PipelineStats{ + "prod_cluster_id": { + { + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + { + ID: "test_pipeline_2", + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + }, + }, + "no_override_cluster_id": { + pipelines: []PipelineStats{ + { + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + { + ID: "test_pipeline_2", + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + expectedMap: map[string][]PipelineStats{ + "es_1": { + { + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + "es_2": { + { + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + "": { + { + ID: "test_pipeline_2", + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actualMap := makeClusterToPipelinesMap(test.pipelines, test.overrideClusterUUID) + require.Equal(t, test.expectedMap, actualMap) + }) + } +}