Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing logic to keep list of unique cluster UUIDs #22808

Merged
merged 7 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix failiures caused by custom beat names with more than 15 characters {pull}22550[22550]
- Stop generating NaN values from Cloud Foundry module to avoid errors in outputs. {pull}22634[22634]
- Update NATS dashboards to leverage connection and route metricsets {pull}22646[22646]
- Fix `logstash` module when `xpack.enabled: true` is set from emitting redundant events. {pull}22808[22808]

*Packetbeat*

Expand Down
10 changes: 5 additions & 5 deletions metricbeat/module/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ type MetricSet struct {
XPack bool
}

type graph struct {
type Graph struct {
Vertices []map[string]interface{} `json:"vertices"`
Edges []map[string]interface{} `json:"edges"`
}

type graphContainer struct {
Graph *graph `json:"graph,omitempty"`
type GraphContainer struct {
Graph *Graph `json:"graph,omitempty"`
Type string `json:"type"`
Version string `json:"version"`
Hash string `json:"hash"`
Expand All @@ -74,8 +74,8 @@ type PipelineState struct {
ID string `json:"id"`
Hash string `json:"hash"`
EphemeralID string `json:"ephemeral_id"`
Graph *graphContainer `json:"graph,omitempty"`
Representation *graphContainer `json:"representation"`
Graph *GraphContainer `json:"graph,omitempty"`
Representation *GraphContainer `json:"representation"`
BatchSize int `json:"batch_size"`
Workers int `json:"workers"`
}
Expand Down
8 changes: 4 additions & 4 deletions metricbeat/module/logstash/node/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,20 @@ func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClust
clusterToPipelinesMap = make(map[string][]logstash.PipelineState)

for _, pipeline := range pipelines {
var clusterUUIDs []string
clusterUUIDs := make(map[string]struct{}, 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common.StringSet

for _, vertex := range pipeline.Graph.Graph.Vertices {
clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID)
if clusterUUID != "" {
clusterUUIDs = append(clusterUUIDs, clusterUUID)
clusterUUIDs[clusterUUID] = struct{}{}
}
}

// If no cluster UUID was found in this pipeline, assign it a blank one
if len(clusterUUIDs) == 0 {
clusterUUIDs = []string{""}
clusterUUIDs[""] = struct{}{}
}

for _, clusterUUID := range clusterUUIDs {
for clusterUUID, _ := range clusterUUIDs {
clusterPipelines := clusterToPipelinesMap[clusterUUID]
if clusterPipelines == nil {
clusterToPipelinesMap[clusterUUID] = []logstash.PipelineState{}
Expand Down
57 changes: 57 additions & 0 deletions metricbeat/module/logstash/node/data_xpack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !integration

package node

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/metricbeat/module/logstash"
)

func TestMakeClusterToPipelinesMap(t *testing.T) {
pipelines := []logstash.PipelineState{
{
ID: "test_pipeline",
Graph: &logstash.GraphContainer{
Graph: &logstash.Graph{
Vertices: []map[string]interface{}{
{
"id": "vertex_1",
},
{
"id": "vertex_2",
},
{
"id": "vertex_3",
},
},
},
},
},
}
m := makeClusterToPipelinesMap(pipelines, "prod_cluster_id")
require.Len(t, m, 1)
for clusterID, pipelines := range m {
require.Equal(t, "prod_cluster_id", clusterID)
require.Len(t, pipelines, 1)
}
}
13 changes: 4 additions & 9 deletions metricbeat/module/logstash/node_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,26 +213,21 @@ func makeClusterToPipelinesMap(pipelines []PipelineStats, overrideClusterUUID st
var clusterToPipelinesMap map[string][]PipelineStats
clusterToPipelinesMap = make(map[string][]PipelineStats)

if overrideClusterUUID != "" {
clusterToPipelinesMap[overrideClusterUUID] = pipelines
return clusterToPipelinesMap
}

Copy link
Contributor Author

@ycombinator ycombinator Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this section because it's also incorrect. We shouldn't be using the override cluster UUID, if set, for all pipelines so broadly. Instead we should figure out (as we do further below) the cluster UUIDs appropriate for each pipeline and build the cluster UUID => pipelines map that way.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a change in semantics and maybe a bc breaking change.

Before this change all pipelines have had been associated with overrideClusterUUID. With this change pipelines are only associated with overrideClusterUUID iff they do no clusterUUID set. This way overrideClusterUUID actually becomes defaultClusterUUID.

Instead of removing this line, how about printing a deprecation warning if this setting is configured and introduce an a default cluster uuid setting that matches the new semantics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, looking at where the value of overrideClusterUUID comes from ultimately, it comes from the monitoring.cluster_uuid setting in logstash.yml: https://www.elastic.co/guide/en/logstash/current/monitoring-with-metricbeat.html#define-cluster__uuid.

Reading the documentation for that setting:

To bind the metrics of Logstash to a specific cluster, optionally define ...

This makes me think, if this setting is set and therefore overrideClusterUUID is set, we should make that the cluster UUID for all pipelines, ignoring what Elasticsearch clusters each pipeline might individually be talking to.

So I'm going to revert this change and, in fact, add a similar code block in the makeClusterToPipelinesMap function in the node metricset.

for _, pipeline := range pipelines {
var clusterUUIDs []string
clusterUUIDs := make(map[string]struct{}, 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is common.StringSet.

The overall data expension starts by converting the map[string]pipelineState into a []pipelineState. Would it make sense to keep the map instead, to reduce the chance of creating duplicate entries in general?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the code to use common.StringSet in 4cb337f but I didn't follow this:

The overall data expension starts by converting the map[string]pipelineState into a []pipelineState. Would it make sense to keep the map instead, to reduce the chance of creating duplicate entries in general?

I think it's because I'm not finding the map[string]pipelineState you mentioned. Where are you seeing that?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do the conversion here.
I was wondering if it is necessary, or maybe can even lead to similar problems (in the future) if we are not careful about not producing duplicates.

for _, vertex := range pipeline.Vertices {
clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID)
if clusterUUID != "" {
clusterUUIDs = append(clusterUUIDs, clusterUUID)
clusterUUIDs[clusterUUID] = struct{}{}
}
}

// If no cluster UUID was found in this pipeline, assign it a blank one
if len(clusterUUIDs) == 0 {
clusterUUIDs = []string{""}
clusterUUIDs[""] = struct{}{}
}

for _, clusterUUID := range clusterUUIDs {
for clusterUUID, _ := range clusterUUIDs {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit clusterUUID := range clusterUUIDS { ... }

clusterPipelines := clusterToPipelinesMap[clusterUUID]
if clusterPipelines == nil {
clusterToPipelinesMap[clusterUUID] = []PipelineStats{}
Expand Down
52 changes: 52 additions & 0 deletions metricbeat/module/logstash/node_stats/data_xpack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !integration

package node_stats

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMakeClusterToPipelinesMap(t *testing.T) {
pipelines := []PipelineStats{
{
ID: "test_pipeline",
Vertices: []map[string]interface{}{
{
"id": "vertex_1",
},
{
"id": "vertex_2",
},
{
"id": "vertex_3",
},
},
},
}
Copy link

@urso urso Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this particular case able to reproduce the issue we have had? If not, we should add one.

The expannsion into the cluster map is somewhat critical in order to produce correct docs, that show the correct association of configurations to single Elasticsearch clusters. Can we make this test more exhaustive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the case that was problematic. Before this PR this case would've produced 3 PipelineState associated with the prod_cluster_id cluster UUID when we should've only got 1 PipelineState.

I will expand this test to add a few more test cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more test cases in 672ee5e.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!


m := makeClusterToPipelinesMap(pipelines, "prod_cluster_id")
require.Len(t, m, 1)
for clusterID, pipelines := range m {
require.Equal(t, "prod_cluster_id", clusterID)
require.Len(t, pipelines, 1)
}
}