Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support shipping directly to monitoring cluster #9260

Merged
merged 59 commits into from
Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
2d5a12b
WIP: Debugging docker-compose failure
ycombinator Feb 8, 2019
29e1e04
[WIP] Add docker compose logging for system tests as well
ycombinator Feb 8, 2019
fe07647
Removing logging statements used for debugging
ycombinator Feb 9, 2019
93ff6fd
Fixing imports
ycombinator Nov 22, 2018
8a81dcd
Introducing monitoring.* config
ycombinator Nov 22, 2018
215d9a3
Determine reporter format (production or monitoring)
ycombinator Nov 22, 2018
1e206a7
WIP: Capture elasticsearch cluster_uuid on connect to ES output?
ycombinator Nov 28, 2018
da4c13d
WIP: record cluster_uuid and then try to use it
ycombinator Nov 28, 2018
7a696e5
Extracting bulk to production code into function
ycombinator Nov 28, 2018
6d0b0e6
Add switch for sending to production or monitoring cluster
ycombinator Nov 28, 2018
27c11a7
Better if check
ycombinator Dec 11, 2018
4c196af
Using switch instead of if-else
ycombinator Dec 11, 2018
edc68e8
Add "_type": "doc" if ES version < 7
ycombinator Dec 14, 2018
eb86660
Rename format -> _format since it's for internal use only
ycombinator Dec 14, 2018
fce82bd
Rename Monitoring -> XPackMonitoring and MonitoringNew -> Monitoring
ycombinator Dec 14, 2018
c6f841c
Use consts instead of bare strings
ycombinator Dec 14, 2018
54dda1e
Move format validation check to constructor
ycombinator Dec 14, 2018
2f29a50
Collect cluster UUID when connection is made with ES output cluster
ycombinator Jan 18, 2019
8b84a93
Fix format passing + cluster UUID retrieval from registry
ycombinator Jan 18, 2019
d3787d9
Fixing error message
ycombinator Jan 18, 2019
096c7d5
Pass down format correctly
ycombinator Jan 18, 2019
34a9823
Changing log level for recurring message
ycombinator Jan 18, 2019
af85f4b
Fixing monitoring index name generation + doc fields
ycombinator Jan 21, 2019
d9993d2
Removing line from vestigial implementation
ycombinator Jan 23, 2019
98ebe0e
Removing unnecessary else
ycombinator Jan 23, 2019
7e568bf
Use iota for reporting format constants
ycombinator Jan 23, 2019
ab30bb8
Better passing of format + validation in constructor
ycombinator Jan 23, 2019
988555a
Adding CHANGELOG entry
ycombinator Feb 2, 2019
6cbd5f5
Adding system test for direct monitoring
ycombinator Feb 9, 2019
97a1d31
Refactoring
ycombinator Feb 9, 2019
0d7d291
Adding skeleton comparison test
ycombinator Feb 9, 2019
e001663
Refactoring: renaming
ycombinator Feb 9, 2019
252ec04
Refactoring
ycombinator Feb 9, 2019
1e1a380
Better cleanup
ycombinator Feb 9, 2019
8d2b706
Fixing formatting
ycombinator Feb 9, 2019
83f8d66
Fixing syntax error
ycombinator Feb 9, 2019
f0d108b
Fixing variable name in template
ycombinator Feb 9, 2019
91b9071
Fleshing out TODOs
ycombinator Feb 12, 2019
2d822b6
Make Hound happy (woof woof!)
ycombinator Feb 12, 2019
ddf9854
Fixing rebase errors
ycombinator Mar 22, 2019
8a7c1fb
Make monitoring type a string + better variable name
ycombinator Mar 22, 2019
f76b536
Change major version check
ycombinator Mar 22, 2019
6685965
Extract meta from action
ycombinator Mar 22, 2019
47a67f2
Use shorter method names
ycombinator Mar 22, 2019
dc800e3
Remove duplication of logic
ycombinator Mar 22, 2019
2059928
Refactoring: move selectMonitoringConfig to monitoring.SelectConfig
ycombinator Mar 22, 2019
a367a9f
Update schema version
ycombinator Mar 22, 2019
9b824ec
Move variable into callback fn's scope so memory is not shared across…
ycombinator Mar 26, 2019
27c7c5f
Fixing scope of registry and var creations so they don't happen on ev…
ycombinator Mar 26, 2019
3e58fe5
Remove naming stutter
ycombinator Mar 29, 2019
d8ffa49
Reduce hackiness with passing reporter format
ycombinator Mar 29, 2019
35aa290
Move error vars to package scope
ycombinator Mar 29, 2019
ba6b231
Removing stutter
ycombinator Apr 2, 2019
1ce9ef8
Adding extra godoc about format usage
ycombinator Apr 2, 2019
bf9dcb5
Extend godoc to explain the rationale behind the format hack
ycombinator Apr 2, 2019
1c13224
Debugging docker container logs for system tests
ycombinator Apr 3, 2019
cd3afe5
Remove _type from bulk request URI
ycombinator Apr 3, 2019
5943f2f
Removing debugging statement
ycombinator Apr 3, 2019
c64a59b
Fixing logic
ycombinator Apr 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- New processor: `copy_fields`. {pull}11303[11303]
- Add `error.message` to events when `fail_on_error` is set in `rename` and `copy_fields` processors. {pull}11303[11303]
- New processor: `truncate_fields`. {pull}11297[11297]
- Allow a beat to ship monitoring data directly to an Elasticsearch monitoring clsuter. {pull}9260[9260]

*Auditbeat*

Expand Down
72 changes: 63 additions & 9 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ import (
errw "github.com/pkg/errors"
"go.uber.org/zap"

sysinfo "github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
ucfg "github.com/elastic/go-ucfg"

"github.com/elastic/beats/libbeat/api"
"github.com/elastic/beats/libbeat/asset"
"github.com/elastic/beats/libbeat/beat"
Expand Down Expand Up @@ -66,9 +70,6 @@ import (
"github.com/elastic/beats/libbeat/publisher/processing"
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/version"
sysinfo "github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
ucfg "github.com/elastic/go-ucfg"
)

// Beat provides the runnable and configurable instance of a beat.
Expand Down Expand Up @@ -102,8 +103,10 @@ type beatConfig struct {
Keystore *common.Config `config:"keystore"`

// output/publishing related configurations
Pipeline pipeline.Config `config:",inline"`
Monitoring *common.Config `config:"xpack.monitoring"`
Pipeline pipeline.Config `config:",inline"`

// monitoring settings
MonitoringBeatConfig monitoring.BeatConfig `config:",inline"`

// central management settings
Management *common.Config `config:"management"`
Expand Down Expand Up @@ -284,6 +287,11 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
return nil, err
}

err = b.registerClusterUUIDFetching()
if err != nil {
return nil, err
}

reg := monitoring.Default.GetRegistry("libbeat")
if reg == nil {
reg = monitoring.Default.NewRegistry("libbeat")
Expand Down Expand Up @@ -361,11 +369,17 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
return err
}

if b.Config.Monitoring.Enabled() {
monitoringCfg, reporterSettings, err := monitoring.SelectConfig(b.Config.MonitoringBeatConfig)
if err != nil {
return err
}

if monitoringCfg.Enabled() {
settings := report.Settings{
DefaultUsername: settings.Monitoring.DefaultUsername,
Format: reporterSettings.Format,
}
reporter, err := report.New(b.Info, settings, b.Config.Monitoring, b.Config.Output)
reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output)
if err != nil {
return err
}
Expand Down Expand Up @@ -759,8 +773,7 @@ func (b *Beat) registerESIndexManagement() error {
return nil
}

// Build and return a callback to load index template into ES
func (b *Beat) indexSetupCallback() func(esClient *elasticsearch.Client) error {
func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
return func(esClient *elasticsearch.Client) error {
m := b.index.Manager(esClient, idxmgmt.BeatsAssets(b.Fields))
return m.Setup(true, true)
Expand Down Expand Up @@ -790,6 +803,47 @@ func (b *Beat) createOutput(stats outputs.Observer, cfg common.ConfigNamespace)
return outputs.Load(b.index, b.Info, stats, cfg.Name(), cfg.Config())
}

func (b *Beat) registerClusterUUIDFetching() error {
if b.Config.Output.Name() == "elasticsearch" {
callback, err := b.clusterUUIDFetchingCallback()
if err != nil {
return err
}
elasticsearch.RegisterConnectCallback(callback)
}
return nil
}

// Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring
func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, error) {
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch")
clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid")

callback := func(esClient *elasticsearch.Client) error {
var response struct {
ClusterUUID string `json:"cluster_uuid"`
}

status, body, err := esClient.Request("GET", "/", "", nil, nil)
if err != nil {
return errw.Wrap(err, "error querying /")
}
if status > 299 {
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("Error querying /. Status: %d. Response body: %s", status, body)
}
err = json.Unmarshal(body, &response)
if err != nil {
return fmt.Errorf("Error unmarshaling json when querying /. Body: %s", body)
}

clusterUUIDRegVar.Set(response.ClusterUUID)
return nil
}
urso marked this conversation as resolved.
Show resolved Hide resolved

return callback, nil
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
}

// handleError handles the given error by logging it and then returning the
// error. If the err is nil or is a GracefulExit error then the method will
// return nil without logging anything.
Expand Down
37 changes: 36 additions & 1 deletion libbeat/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,19 @@

package monitoring

import "errors"
import (
"errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/monitoring/report"
)

// BeatConfig represents the part of the $BEAT.yml to do with monitoring settings
type BeatConfig struct {
XPackMonitoring *common.Config `config:"xpack.monitoring"`
Monitoring *common.Config `config:"monitoring"`
}

type Mode uint8

Expand All @@ -30,6 +42,11 @@ const (
Full
)

var (
errMonitoringBothConfigEnabled = errors.New("both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.elasticsearch.hosts to monitoring cluster hosts")
warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.elasticsearch.hosts to monitoring cluster hosts"
)

// Default is the global default metrics registry provided by the monitoring package.
var Default = NewRegistry()

Expand Down Expand Up @@ -67,3 +84,21 @@ func Remove(name string) {
func Clear() error {
return Default.Clear()
}

// SelectConfig selects the appropriate monitoring configuration based on the user's settings in $BEAT.yml. Users may either
// use xpack.monitoring.* settings OR monitoring.* settings but not both.
func SelectConfig(beatCfg BeatConfig) (*common.Config, *report.Settings, error) {
switch {
case beatCfg.Monitoring.Enabled() && beatCfg.XPackMonitoring.Enabled():
return nil, nil, errMonitoringBothConfigEnabled
case beatCfg.XPackMonitoring.Enabled():
cfgwarn.Deprecate("7.0", warnMonitoringDeprecatedConfig)
monitoringCfg := beatCfg.XPackMonitoring
return monitoringCfg, &report.Settings{Format: report.FormatXPackMonitoringBulk}, nil
case beatCfg.Monitoring.Enabled():
monitoringCfg := beatCfg.Monitoring
return monitoringCfg, &report.Settings{Format: report.FormatBulk}, nil
default:
return nil, nil, nil
}
}
108 changes: 90 additions & 18 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package elasticsearch
import (
"encoding/json"
"fmt"
"time"

"github.com/pkg/errors"
urso marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -34,17 +35,20 @@ import (
type publishClient struct {
es *esout.Client
params map[string]string
format report.Format
}

func newPublishClient(
es *esout.Client,
params map[string]string,
) *publishClient {
format report.Format,
) (*publishClient, error) {
p := &publishClient{
es: es,
params: params,
format: format,
}
return p
return p, nil
}

func (c *publishClient) Connect() error {
Expand Down Expand Up @@ -84,6 +88,7 @@ func (c *publishClient) Connect() error {
}

debugf("XPack monitoring is enabled")

return nil
}

Expand All @@ -97,13 +102,18 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
var reason error
for _, event := range events {

// Extract time
// Extract type
t, err := event.Content.Meta.GetValue("type")
if err != nil {
logp.Err("Type not available in monitoring reported. Please report this error: %s", err)
continue
}

typ, ok := t.(string)
if !ok {
logp.Err("monitoring type is not a string")
}

var params = map[string]string{}
// Copy params
for k, v := range c.params {
Expand All @@ -120,23 +130,13 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
}
}

meta := common.MapStr{
"_index": "",
"_routing": nil,
"_type": t,
}
bulk := [2]interface{}{
common.MapStr{"index": meta},
report.Event{
Timestamp: event.Content.Timestamp,
Fields: event.Content.Fields,
},
switch c.format {
case report.FormatXPackMonitoringBulk:
err = c.publishXPackBulk(params, event, typ)
case report.FormatBulk:
err = c.publishBulk(event, typ)
}
ycombinator marked this conversation as resolved.
Show resolved Hide resolved

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
_, err = c.es.SendMonitoringBulk(params, bulk[:])

if err != nil {
failed = append(failed, event)
reason = err
Expand All @@ -158,3 +158,75 @@ func (c *publishClient) Test(d testing.Driver) {
func (c *publishClient) String() string {
return "publish(" + c.es.String() + ")"
}

func (c *publishClient) publishXPackBulk(params map[string]string, event publisher.Event, typ string) error {
meta := common.MapStr{
"_index": "",
"_routing": nil,
"_type": typ,
}
bulk := [2]interface{}{
common.MapStr{"index": meta},
report.Event{
Timestamp: event.Content.Timestamp,
Fields: event.Content.Fields,
},
}

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
_, err := c.es.SendMonitoringBulk(params, bulk[:])
return err
}

func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
meta := common.MapStr{
"_index": getMonitoringIndexName(),
"_routing": nil,
}

if c.es.GetVersion().Major < 7 {
meta["_type"] = "doc"
}

action := common.MapStr{
"index": meta,
}
urso marked this conversation as resolved.
Show resolved Hide resolved

event.Content.Fields.Put("timestamp", event.Content.Timestamp)

fields := common.MapStr{
"type": typ,
typ: event.Content.Fields,
}
urso marked this conversation as resolved.
Show resolved Hide resolved

interval, err := event.Content.Meta.GetValue("interval_ms")
if err != nil {
return errors.Wrap(err, "could not determine interval_ms field")
}
fields.Put("interval_ms", interval)

clusterUUID, err := event.Content.Meta.GetValue("cluster_uuid")
if err != nil && err != common.ErrKeyNotFound {
return errors.Wrap(err, "could not determine cluster_uuid field")
}
fields.Put("cluster_uuid", clusterUUID)

document := report.Event{
Timestamp: event.Content.Timestamp,
Fields: fields,
}
bulk := [2]interface{}{action, document}

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
// FIXME: index name (first param below)
_, err = c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
return err
}

func getMonitoringIndexName() string {
version := 7
date := time.Now().Format("2006.01.02")
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
}
3 changes: 3 additions & 0 deletions libbeat/monitoring/report/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package elasticsearch
import (
"time"

"github.com/elastic/beats/libbeat/monitoring/report"

"github.com/elastic/beats/libbeat/common/transport/tlscommon"
)

Expand All @@ -43,6 +45,7 @@ type config struct {
BufferSize int `config:"buffer_size"`
Tags []string `config:"tags"`
Backoff backoff `config:"backoff"`
Format report.Format `config:"_format"`
}

type backoff struct {
Expand Down
Loading