Skip to content

Commit

Permalink
Add RHEL/Centos support to the system/users metricset (elastic#16902)
Browse files Browse the repository at this point in the history
* major refactor to support different systemd version

* format and updates

* update ref docs

* update ref, again

* add newline

* Fix error string

Co-Authored-By: Jaime Soriano Pastor <[email protected]>

* add changelog entry

* make update

* add build target

Co-authored-by: Jaime Soriano Pastor <[email protected]>
  • Loading branch information
fearful-symmetry and jsoriano authored Mar 12, 2020
1 parent 2a643c9 commit 1d36da7
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix detection and logging of some error cases with light modules. {pull}14706[14706]
- Fix imports after PR was merged before rebase. {pull}16756[16756]
- Add dashboard for `redisenterprise` module. {pull}16752[16752]
- Dynamically choose a method for the system/service metricset to support older linux distros. {pull}16902[16902]

*Packetbeat*

Expand Down
5 changes: 4 additions & 1 deletion metricbeat/docs/modules/system.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ metricbeat.modules:
#diskio.include_devices: []
# Filter systemd services by status or sub-status
#service.state_filter: []
#service.state_filter: ["active"]
# Filter systemd services based on a name pattern
#service.pattern_filter: ["ssh*", "nfs*"]
----

[float]
Expand Down
5 changes: 4 additions & 1 deletion metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ metricbeat.modules:
#diskio.include_devices: []

# Filter systemd services by status or sub-status
#service.state_filter: []
#service.state_filter: ["active"]

# Filter systemd services based on a name pattern
#service.pattern_filter: ["ssh*", "nfs*"]

#------------------------------ Aerospike Module ------------------------------
- module: aerospike
Expand Down
5 changes: 4 additions & 1 deletion metricbeat/module/system/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,7 @@
#diskio.include_devices: []

# Filter systemd services by status or sub-status
#service.state_filter: []
#service.state_filter: ["active"]

# Filter systemd services based on a name pattern
#service.pattern_filter: ["ssh*", "nfs*"]
1 change: 1 addition & 0 deletions metricbeat/module/system/service/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ For more information, https://www.freedesktop.org/software/systemd/man/systemd.r
=== Configuration

*`service.state_filter`* - A list of service states to filter by. This can be any of the states or sub-states known to systemd.
*`service.pattern_filter`* - A list of glob patterns to filter service names by. This is an "or" filter, and will report any systemd unit that matches at least one filter pattern.

[float]
=== Dashboard
Expand Down
182 changes: 182 additions & 0 deletions metricbeat/module/system/service/dbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//+build !netbsd

package service

import (
"encoding/xml"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/coreos/go-systemd/v22/dbus"
dbusRaw "github.com/godbus/dbus"
"github.com/pkg/errors"
)

type unitFetcher func(conn *dbus.Conn, states, patterns []string) ([]dbus.UnitStatus, error)

// instrospectForUnitMethods determines what methods are available via dbus for listing systemd units.
// We have a number of functions, some better than others, for getting and filtering unit lists.
// This will attempt to find the most optimal method, and move down to methods that require more work.
func instrospectForUnitMethods() (unitFetcher, error) {
//setup a dbus connection
conn, err := dbusRaw.SystemBusPrivate()
if err != nil {
return nil, errors.Wrap(err, "error getting connection to system bus")
}

auth := dbusRaw.AuthExternal(strconv.Itoa(os.Getuid()))
err = conn.Auth([]dbusRaw.Auth{auth})
if err != nil {
return nil, errors.Wrap(err, "error authenticating")
}

err = conn.Hello()
if err != nil {
return nil, errors.Wrap(err, "error in Hello")
}

var props string

//call "introspect" on the systemd1 path to see what ListUnit* methods are available
obj := conn.Object("org.freedesktop.systemd1", dbusRaw.ObjectPath("/org/freedesktop/systemd1"))
err = obj.Call("org.freedesktop.DBus.Introspectable.Introspect", 0).Store(&props)
if err != nil {
return nil, errors.Wrap(err, "error calling dbus")
}

unitMap, err := parseXMLAndReturnMethods(props)
if err != nil {
return nil, errors.Wrap(err, "error handling XML")
}

//return a function callback ordered by desirability
if _, ok := unitMap["ListUnitsByPatterns"]; ok {
return listUnitsByPatternWrapper, nil
} else if _, ok := unitMap["ListUnitsFiltered"]; ok {
return listUnitsFilteredWrapper, nil
} else if _, ok := unitMap["ListUnits"]; ok {
return listUnitsWrapper, nil
}
return nil, fmt.Errorf("no supported list Units function: %v", unitMap)
}

func parseXMLAndReturnMethods(str string) (map[string]bool, error) {

type Method struct {
Name string `xml:"name,attr"`
}

type Iface struct {
Name string `xml:"name,attr"`
Method []Method `xml:"method"`
}

type IntrospectData struct {
XMLName xml.Name `xml:"node"`
Interface []Iface `xml:"interface"`
}

methods := IntrospectData{}

err := xml.Unmarshal([]byte(str), &methods)
if err != nil {
return nil, errors.Wrap(err, "error unmarshalling XML")
}

if len(methods.Interface) == 0 {
return nil, errors.Wrap(err, "no methods found on introspect")
}
methodMap := make(map[string]bool)
for _, iface := range methods.Interface {
for _, method := range iface.Method {
if strings.Contains(method.Name, "ListUnits") {
methodMap[method.Name] = true
}
}
}

return methodMap, nil
}

// listUnitsByPatternWrapper is a bare wrapper for the unitFetcher type
func listUnitsByPatternWrapper(conn *dbus.Conn, states, patterns []string) ([]dbus.UnitStatus, error) {
return conn.ListUnitsByPatterns(states, patterns)
}

//listUnitsFilteredWrapper wraps the dbus ListUnitsFiltered method
func listUnitsFilteredWrapper(conn *dbus.Conn, states, patterns []string) ([]dbus.UnitStatus, error) {
units, err := conn.ListUnitsFiltered(states)
if err != nil {
return nil, errors.Wrap(err, "ListUnitsFiltered error")
}

return matchUnitPatterns(patterns, units)
}

// listUnitsWrapper wraps the dbus ListUnits method
func listUnitsWrapper(conn *dbus.Conn, states, patterns []string) ([]dbus.UnitStatus, error) {
units, err := conn.ListUnits()
if err != nil {
return nil, errors.Wrap(err, "ListUnits error")
}
if len(patterns) > 0 {
units, err = matchUnitPatterns(patterns, units)
if err != nil {
return nil, errors.Wrap(err, "error matching unit patterns")
}
}

if len(states) > 0 {
var finalUnits []dbus.UnitStatus
for _, unit := range units {
for _, state := range states {
if unit.LoadState == state || unit.ActiveState == state || unit.SubState == state {
finalUnits = append(finalUnits, unit)
break
}
}
}
return finalUnits, nil
}

return units, nil
}

// matchUnitPatterns returns a list of units that match the pattern list.
// This algo, including filepath.Match, is designed to (somewhat) emulate the behavior of ListUnitsByPatterns, which uses `fnmatch`.
func matchUnitPatterns(patterns []string, units []dbus.UnitStatus) ([]dbus.UnitStatus, error) {
var matchUnits []dbus.UnitStatus
for _, unit := range units {
for _, pattern := range patterns {
match, err := filepath.Match(pattern, unit.Name)
if err != nil {
return nil, errors.Wrapf(err, "error matching with pattern %s", pattern)
}
if match {
matchUnits = append(matchUnits, unit)
break
}
}
}
return matchUnits, nil
}
17 changes: 13 additions & 4 deletions metricbeat/module/system/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import (

// Config stores the config object
type Config struct {
StateFilter []string `config:"service.state_filter"`
StateFilter []string `config:"service.state_filter"`
PatternFilter []string `config:"service.pattern_filter"`
}

// init registers the MetricSet with the central registry as soon as the program
Expand All @@ -47,8 +48,9 @@ func init() {
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
conn *dbus.Conn
cfg Config
conn *dbus.Conn
cfg Config
unitList unitFetcher
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
Expand All @@ -66,18 +68,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, errors.Wrap(err, "error connecting to dbus")
}

unitFunction, err := instrospectForUnitMethods()
if err != nil {
return nil, errors.Wrap(err, "error finding ListUnits Method")
}

return &MetricSet{
BaseMetricSet: base,
conn: conn,
cfg: config,
unitList: unitFunction,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
units, err := m.conn.ListUnitsByPatterns(m.cfg.StateFilter, []string{"*.service"})

units, err := m.unitList(m.conn, m.cfg.StateFilter, append([]string{"*.service"}, m.cfg.PatternFilter...))
if err != nil {
return errors.Wrap(err, "error getting list of running units")
}
Expand Down
5 changes: 4 additions & 1 deletion x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ metricbeat.modules:
#diskio.include_devices: []

# Filter systemd services by status or sub-status
#service.state_filter: []
#service.state_filter: ["active"]

# Filter systemd services based on a name pattern
#service.pattern_filter: ["ssh*", "nfs*"]

#------------------------------- Activemq Module -------------------------------
- module: activemq
Expand Down

0 comments on commit 1d36da7

Please sign in to comment.