Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add random startup delay to each metricset
Browse files Browse the repository at this point in the history
Add random startup delay to each metricset to avoid the thundering herd problem. Fixes elastic#4010.
andrewkroh committed Jun 14, 2017
1 parent 284a267 commit 5b5c2bd
Showing 12 changed files with 78 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
*Heartbeat*

*Metricbeat*
- Add random startup delay to each metricset to avoid the thundering herd problem. {issue}4010[4010]

*Packetbeat*

4 changes: 4 additions & 0 deletions metricbeat/_meta/common.full.yml
Original file line number Diff line number Diff line change
@@ -21,3 +21,7 @@ metricbeat.config.modules:

# Set to true to enable config reloading
reload.enabled: false

# Maximum amount of time to randomly delay the start of a metricset. Use 0 to
# disable startup delay.
metricbeat.max_start_delay: 10s
11 changes: 10 additions & 1 deletion metricbeat/beater/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package beater

import "github.com/elastic/beats/libbeat/common"
import (
"time"

"github.com/elastic/beats/libbeat/common"
)

// Config is the root of the Metricbeat configuration hierarchy.
type Config struct {
// Modules is a list of module specific configuration data.
Modules []*common.Config `config:"modules"`
ReloadModules *common.Config `config:"config.modules"`
MaxStartDelay time.Duration `config:"max_start_delay"` // Upper bound on the random startup delay for metricsets (use 0 to disable startup delay).
}

var defaultConfig = Config{
MaxStartDelay: 10 * time.Second,
}
11 changes: 4 additions & 7 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
@@ -30,14 +30,12 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// List all registered modules and metricsets.
logp.Info("%s", mb.Registry.String())

config := Config{}

err := rawConfig.Unpack(&config)
if err != nil {
config := defaultConfig
if err := rawConfig.Unpack(&config); err != nil {
return nil, errors.Wrap(err, "error reading configuration file")
}

modules, err := module.NewWrappers(config.Modules, mb.Registry)
modules, err := module.NewWrappers(config.MaxStartDelay, config.Modules, mb.Registry)
if err != nil {
// Empty config is fine if dynamic config is enabled
if !config.ReloadModules.Enabled() {
@@ -61,7 +59,6 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// that a single unresponsive host cannot inadvertently block other hosts
// within the same Module and MetricSet from collection.
func (bt *Metricbeat) Run(b *beat.Beat) error {

var wg sync.WaitGroup

for _, m := range bt.modules {
@@ -78,7 +75,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
if bt.config.ReloadModules.Enabled() {
logp.Beta("feature dynamic configuration reloading is enabled.")
moduleReloader := cfgfile.NewReloader(bt.config.ReloadModules)
factory := module.NewFactory(b.Publisher)
factory := module.NewFactory(bt.config.MaxStartDelay, b.Publisher)

go moduleReloader.Run(factory)
wg.Add(1)
Original file line number Diff line number Diff line change
@@ -22,7 +22,22 @@ metricbeat.modules:
hosts: ["root@tcp(127.0.0.1:3306)/"]
------------------------------------------------------------------------------

==== Metricbeat Options
==== General Options

===== max_start_delay

The maximum random delay to apply to the startup of a metricset. Random delays
ranging from [0, _max_start_delay_) are applied to reduce the thundering herd
effect that can occur if a fleet of machines running Metricbeat are restarted at
the same time. Specifying a value of 0 disables the startup delay. The default
is 10s.

[source,yaml]
----
metricbeat.max_start_delay: 10s
----

==== Module Options

You can specify the following options in the `metricbeat` section of the +{beatname_lc}.yml+ config file. These options
are the same for all modules. Each module may have additional configuration options that are specific to that module.
4 changes: 2 additions & 2 deletions metricbeat/mb/module/example_test.go
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ func ExampleWrapper() {
}

// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry)
m, err := module.NewWrapper(0, config, mb.Registry)
if err != nil {
fmt.Println("Error:", err)
return
@@ -97,7 +97,7 @@ func ExampleRunner() {
}

// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry)
m, err := module.NewWrapper(0, config, mb.Registry)
if err != nil {
return
}
12 changes: 8 additions & 4 deletions metricbeat/mb/module/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package module

import (
"time"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
@@ -9,18 +11,20 @@ import (

// Factory is used to register and reload modules
type Factory struct {
client func() publisher.Client
client func() publisher.Client
maxStartDelay time.Duration
}

// NewFactory creates new Reloader instance for the given config
func NewFactory(p publisher.Publisher) *Factory {
func NewFactory(maxStartDelay time.Duration, p publisher.Publisher) *Factory {
return &Factory{
client: p.Connect,
client: p.Connect,
maxStartDelay: maxStartDelay,
}
}

func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) {
w, err := NewWrapper(c, mb.Registry)
w, err := NewWrapper(r.maxStartDelay, c, mb.Registry)
if err != nil {
return nil, err
}
2 changes: 1 addition & 1 deletion metricbeat/mb/module/runner_test.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ func TestRunner(t *testing.T) {
}

// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry)
m, err := module.NewWrapper(0, config, mb.Registry)
if err != nil {
t.Fatal(err)
}
30 changes: 22 additions & 8 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package module

import (
"fmt"
"math/rand"
"sync"
"time"

@@ -36,9 +37,10 @@ var (
// Use NewWrapper or NewWrappers to construct new Wrappers.
type Wrapper struct {
mb.Module
filters *processors.Processors
metricSets []*metricSetWrapper // List of pointers to its associated MetricSets.
configHash uint64
filters *processors.Processors
metricSets []*metricSetWrapper // List of pointers to its associated MetricSets.
configHash uint64
maxStartDelay time.Duration
}

// metricSetWrapper contains the MetricSet and the private data associated with
@@ -61,8 +63,8 @@ type stats struct {
// NewWrapper create a new Module and its associated MetricSets based
// on the given configuration. It constructs the supporting filters and stores
// them in the Wrapper.
func NewWrapper(moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) {
mws, err := NewWrappers([]*common.Config{moduleConfig}, r)
func NewWrapper(maxStartDelay time.Duration, moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) {
mws, err := NewWrappers(maxStartDelay, []*common.Config{moduleConfig}, r)
if err != nil {
return nil, err
}
@@ -77,7 +79,7 @@ func NewWrapper(moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) {
// NewWrappers creates new Modules and their associated MetricSets based
// on the given configuration. It constructs the supporting filters and stores
// them all in a Wrapper.
func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, error) {
func NewWrappers(maxStartDelay time.Duration, modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, error) {
modules, err := mb.NewModules(modulesConfig, r)
if err != nil {
return nil, err
@@ -95,8 +97,9 @@ func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, er
}

mw := &Wrapper{
Module: k,
filters: f,
Module: k,
filters: f,
maxStartDelay: maxStartDelay,
}
wrappers = append(wrappers, mw)

@@ -188,6 +191,17 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- common.MapStr)
defer logp.Recover(fmt.Sprintf("recovered from panic while fetching "+
"'%s/%s' for host '%s'", msw.module.Name(), msw.Name(), msw.Host()))

// Start each metricset randomly over a period of MaxDelayPeriod.
if msw.module.maxStartDelay > 0 {
delay := time.Duration(rand.Int63n(int64(msw.module.maxStartDelay)))
debugf("%v/%v will start after %v", msw.module.Name(), msw.Name(), delay)
select {
case <-done:
return
case <-time.After(delay):
}
}

debugf("Starting %s", msw)
defer debugf("Stopped %s", msw)

6 changes: 3 additions & 3 deletions metricbeat/mb/module/wrapper_test.go
Original file line number Diff line number Diff line change
@@ -121,7 +121,7 @@ func TestWrapperOfEventFetcher(t *testing.T) {
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
m, err := module.NewWrapper(0, c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}
@@ -154,7 +154,7 @@ func TestWrapperOfReportingFetcher(t *testing.T) {
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
m, err := module.NewWrapper(0, c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}
@@ -187,7 +187,7 @@ func TestWrapperOfPushMetricSet(t *testing.T) {
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
m, err := module.NewWrapper(0, c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}
4 changes: 4 additions & 0 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
@@ -22,6 +22,10 @@ metricbeat.config.modules:
# Set to true to enable config reloading
reload.enabled: false

# Maximum amount of time to randomly delay the start of a metricset. Use 0 to
# disable startup delay.
metricbeat.max_start_delay: 10s

#========================== Modules configuration ============================
metricbeat.modules:

3 changes: 3 additions & 0 deletions metricbeat/tests/system/config/metricbeat.yml.j2
Original file line number Diff line number Diff line change
@@ -98,6 +98,9 @@ metricbeat.config.modules:
reload.enabled: true
{% endif -%}

# Disable random start delay for metricsets.
metricbeat.max_start_delay: 0

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group

0 comments on commit 5b5c2bd

Please sign in to comment.