Skip to content

Commit

Permalink
Fix source_node logic in elasticsearch/shards xpack code (#8533)
Browse files Browse the repository at this point in the history
* Add source_node to elasticsearch/shard metricset xpack code

* Only add source_node if shard has been allocated to node
  • Loading branch information
ycombinator authored Oct 3, 2018
1 parent 2a04e2c commit 3fedc8d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
9 changes: 6 additions & 3 deletions metricbeat/module/elasticsearch/shard/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ var (
)

type stateStruct struct {
ClusterName string `json:"cluster_name"`
StateID string `json:"state_uuid"`
MasterNode string `json:"master_node"`
ClusterName string `json:"cluster_name"`
StateID string `json:"state_uuid"`
MasterNode string `json:"master_node"`
Nodes map[string]struct {
Name string `json:"name"`
} `json:"nodes"`
RoutingTable struct {
Indices map[string]struct {
Shards map[string][]map[string]interface{} `json:"shards"`
Expand Down
44 changes: 26 additions & 18 deletions metricbeat/module/elasticsearch/shard/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,13 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
return
}

nodeInfo, err := elasticsearch.GetNodeInfo(m.HTTP, m.HostData().SanitizedURI+statePath, stateData.MasterNode)
if err != nil {
return
}

// TODO: This is currently needed because the cluser_uuid is `na` in stateData in case not the full state is requested.
// Will be fixed in: https://github.com/elastic/elasticsearch/pull/30656
clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HostData().SanitizedURI+statePath, stateData.MasterNode)
if err != nil {
return
}

sourceNode := common.MapStr{
"uuid": stateData.MasterNode,
"host": nodeInfo.Host,
"transport_address": nodeInfo.TransportAddress,
"ip": nodeInfo.IP,
// This seems to be in the x-pack data a subset of the cluster_uuid not the name?
"name": stateData.ClusterName,
"timestamp": common.Time(time.Now()),
}

for _, index := range stateData.RoutingTable.Indices {
for _, shards := range index.Shards {
for _, shard := range shards {
Expand All @@ -77,17 +62,28 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
continue
}

event.RootFields = common.MapStr{}

event.RootFields = common.MapStr{
"timestamp": time.Now(),
"cluster_uuid": clusterID,
"interval_ms": m.Module().Config().Period.Nanoseconds() / 1000 / 1000,
"type": "shards",
"source_node": sourceNode,
"shard": fields,
"state_uuid": stateData.StateID,
}

// Build source_node object
nodeID, ok := shard["node"]
if !ok {
continue
}
if nodeID != nil { // shard has not been allocated yet
sourceNode, err := getSourceNode(nodeID.(string), stateData)
if err != nil {
continue
}
event.RootFields.Put("source_node", sourceNode)
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)

r.Event(event)
Expand All @@ -96,3 +92,15 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
}
}
}

func getSourceNode(nodeID string, stateData *stateStruct) (common.MapStr, error) {
nodeInfo, ok := stateData.Nodes[nodeID]
if !ok {
return nil, elastic.MakeErrorForMissingField("nodes."+nodeID, elastic.Elasticsearch)
}

return common.MapStr{
"uuid": nodeID,
"name": nodeInfo.Name,
}, nil
}
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func init() {
}

const (
statePath = "/_cluster/state/version,master_node,routing_table"
statePath = "/_cluster/state/version,nodes,master_node,routing_table"
)

// MetricSet type defines all fields of the MetricSet
Expand Down

0 comments on commit 3fedc8d

Please sign in to comment.