diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ffe8147ca1d..3360e57de2f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -200,6 +200,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Heartbeat* +- Users can now configure max scheduler job limits per monitor type via env var. {pull}34307[34307] - Remove host and port matching restrictions on hint-generated monitors. {pull}34376[34376] diff --git a/heartbeat/autodiscover/builder/hints/monitors_test.go b/heartbeat/autodiscover/builder/hints/monitors_test.go index d4181b0cdd5..935c11cad76 100644 --- a/heartbeat/autodiscover/builder/hints/monitors_test.go +++ b/heartbeat/autodiscover/builder/hints/monitors_test.go @@ -18,6 +18,7 @@ package hints import ( + "sort" "testing" "github.com/stretchr/testify/assert" @@ -96,7 +97,7 @@ func TestGenerateHints(t *testing.T) { result: mapstr.M{ "type": "http", "schedule": "@every 5s", - "hosts": []interface{}{"1.2.3.4:8888", "1.2.3.4:9090"}, + "hosts": []string{"1.2.3.4:8888", "1.2.3.4:9090"}, }, }, { @@ -137,7 +138,7 @@ func TestGenerateHints(t *testing.T) { len: 1, result: mapstr.M{ "type": "http", - "hosts": []interface{}{"1.2.3.4:9090"}, + "hosts": []string{"1.2.3.4:9090"}, "schedule": "@every 5s", "processors": []interface{}{ map[string]interface{}{ @@ -170,7 +171,7 @@ func TestGenerateHints(t *testing.T) { result: mapstr.M{ "type": "http", "schedule": "@every 5s", - "hosts": []interface{}{"1.2.3.4:8888", "1.2.3.4:9090"}, + "hosts": []string{"1.2.3.4:8888", "1.2.3.4:9090"}, }, }, { @@ -209,6 +210,17 @@ func TestGenerateHints(t *testing.T) { err := cfgs[0].Unpack(&config) assert.Nil(t, err, test.message) + // Autodiscover can return configs with different sort orders here, which is irrelevant + // To make tests pass consistently we sort the host list + hostStrs := []string{} + if hostsSlice, ok := config["hosts"].([]interface{}); ok && len(hostsSlice) > 0 { + for _, hi := range hostsSlice { + hostStrs = append(hostStrs, hi.(string)) + } + sort.Strings(hostStrs) + config["hosts"] = hostStrs + } + assert.Equal(t, test.result, config, test.message) } diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 7845559a64f..6fb8fb69102 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -49,7 +49,7 @@ type Heartbeat struct { done chan struct{} stopOnce sync.Once // config is used for iterating over elements of the config. - config config.Config + config *config.Config scheduler *scheduler.Scheduler monitorReloader *cfgfile.Reloader monitorFactory *monitors.RunnerFactory @@ -59,7 +59,7 @@ type Heartbeat struct { // New creates a new heartbeat. func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { - parsedConfig := config.DefaultConfig + parsedConfig := config.DefaultConfig() if err := rawConfig.Unpack(&parsedConfig); err != nil { return nil, fmt.Errorf("error reading config file: %w", err) } diff --git a/heartbeat/config/config.go b/heartbeat/config/config.go index d6f33bd6a4d..a92ca0d3b1d 100644 --- a/heartbeat/config/config.go +++ b/heartbeat/config/config.go @@ -21,9 +21,15 @@ package config import ( + "fmt" + "os" + "strconv" + "strings" + "github.com/elastic/beats/v7/libbeat/autodiscover" "github.com/elastic/beats/v7/libbeat/processors/util" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) type LocationWithID struct { @@ -53,10 +59,28 @@ type Scheduler struct { } // DefaultConfig is the canonical instantiation of Config. -var DefaultConfig = Config{ - Jobs: map[string]*JobLimit{ - "browser": { - Limit: 2, - }, - }, +func DefaultConfig() *Config { + limits := map[string]*JobLimit{ + "browser": {Limit: 2}, + } + + // Read the env key SYNTHETICS_LIMIT_{TYPE} for each type of monitor to set scaling limits + // hard coded list of types to avoid cycles in current plugin system. + // TODO: refactor plugin system to DRY this up + for _, t := range []string{"http", "tcp", "icmp", "browser"} { + envKey := fmt.Sprintf("SYNTHETICS_LIMIT_%s", strings.ToUpper(t)) + if limitStr := os.Getenv(envKey); limitStr != "" { + tLimitVal, err := strconv.ParseInt(limitStr, 10, 64) + if err != nil { + logp.L().Warnf("Could not parse job limit env var %s with value '%s' as integer", envKey, limitStr) + continue + } + + limits[t] = &JobLimit{Limit: tLimitVal} + } + } + + return &Config{ + Jobs: limits, + } } diff --git a/heartbeat/config/config_test.go b/heartbeat/config/config_test.go index 9b529728b45..c782c7a6d9c 100644 --- a/heartbeat/config/config_test.go +++ b/heartbeat/config/config_test.go @@ -15,7 +15,83 @@ // specific language governing permissions and limitations // under the License. -//go:build !integration -// +build !integration - package config + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDefaults(t *testing.T) { + cases := []struct { + Name string + EnvKey string + EnvVal string + LimitType string + LimitVal int64 + }{ + { + "Browser monitor override", + "SYNTHETICS_LIMIT_BROWSER", + "123", + "browser", + 123, + }, + { + "Browser default is 2 when other monitor is overridden", + "SYNTHETICS_LIMIT_HTTP", + "123", + "browser", + 2, + }, + { + "Browser default is 2 when nothing is overridden", + "FOO", + "bar", + "browser", + 2, + }, + { + "Browser default is 2 when bad value passed", + "SYNTHETICS_LIMIT_BROWSER", + "bar", + "browser", + 2, + }, + { + "HTTP monitor override", + "SYNTHETICS_LIMIT_HTTP", + "456", + "http", + 456, + }, + { + "TCP monitor override", + "SYNTHETICS_LIMIT_TCP", + "789", + "tcp", + 789, + }, + { + "ICMP monitor override", + "SYNTHETICS_LIMIT_ICMP", + "911", + "icmp", + 911, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + os.Setenv(c.EnvKey, c.EnvVal) + defer os.Unsetenv(c.EnvKey) + + dc := DefaultConfig() + require.NotNil(t, dc.Jobs[c.LimitType]) + assert.Equal(t, dc.Jobs[c.LimitType].Limit, c.LimitVal) + }) + } +} diff --git a/heartbeat/docs/heartbeat-scheduler.asciidoc b/heartbeat/docs/heartbeat-scheduler.asciidoc index 7df831f7a27..46d78948e56 100644 --- a/heartbeat/docs/heartbeat-scheduler.asciidoc +++ b/heartbeat/docs/heartbeat-scheduler.asciidoc @@ -60,3 +60,5 @@ heartbeat.jobs: In the example, at any given time {beatname_uc} guarantees that only 10 concurrent `http` tasks and only 5 concurrent `browser` tasks will be active. + +These limits can also be set via the environment variables `SYNTHETICS_LIMIT_{TYPE}`, where `{TYPE}` is one of `BROWSER`, `HTTP`, `TCP`, and `ICMP`. \ No newline at end of file diff --git a/heartbeat/monitors/active/dialchain/dialers.go b/heartbeat/monitors/active/dialchain/dialers.go index 4163adb34d5..7889ac82a27 100644 --- a/heartbeat/monitors/active/dialchain/dialers.go +++ b/heartbeat/monitors/active/dialchain/dialers.go @@ -39,12 +39,12 @@ import ( // // The dialer will update the active events with: // -// { -// "tcp": { -// "port": ..., -// "rtt": { "connect": { "us": ... }} -// } -// } +// { +// "tcp": { +// "port": ..., +// "rtt": { "connect": { "us": ... }} +// } +// } func TCPDialer(to time.Duration) NetDialer { return CreateNetDialer(to) } @@ -56,12 +56,12 @@ func TCPDialer(to time.Duration) NetDialer { // // The dialer will update the active events with: // -// { -// "udp": { -// "port": ..., -// "rtt": { "connect": { "us": ... }} -// } -// } +// { +// "udp": { +// "port": ..., +// "rtt": { "connect": { "us": ... }} +// } +// } func UDPDialer(to time.Duration) NetDialer { return CreateNetDialer(to) } @@ -106,13 +106,13 @@ func CreateNetDialer(timeout time.Duration) NetDialer { } end := time.Now() - eventext.MergeEventFields(event, mapstr.M{ - namespace: mapstr.M{ - "rtt": mapstr.M{ - "connect": look.RTT(end.Sub(start)), - }, + ef := mapstr.M{} + ef[namespace] = mapstr.M{ + "rtt": mapstr.M{ + "connect": look.RTT(end.Sub(start)), }, - }) + } + eventext.MergeEventFields(event, ef) return conn, nil }), nil diff --git a/heartbeat/monitors/active/dialchain/socks5.go b/heartbeat/monitors/active/dialchain/socks5.go index a3e87c7edaf..8d6dc00a0a8 100644 --- a/heartbeat/monitors/active/dialchain/socks5.go +++ b/heartbeat/monitors/active/dialchain/socks5.go @@ -30,11 +30,11 @@ import ( // // The layer will update the active event with: // -// { -// "socks5": { -// "rtt": { "connect": { "us": ... }} -// } -// } +// { +// "socks5": { +// "rtt": { "connect": { "us": ... }} +// } +// } func SOCKS5Layer(config *transport.ProxyConfig) Layer { return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) { var timer timer diff --git a/heartbeat/monitors/active/http/check.go b/heartbeat/monitors/active/http/check.go index 36913fb130e..5ec83564500 100644 --- a/heartbeat/monitors/active/http/check.go +++ b/heartbeat/monitors/active/http/check.go @@ -179,7 +179,9 @@ func parseBody(b interface{}) (positiveMatch, negativeMatch []match.Matcher, err return positiveMatch, negativeMatch, errBodyIllegalBody } -/* checkBody accepts 2 check types: +/* + checkBody accepts 2 check types: + 1. positive 2. negative So, there are 4 kinds of scenarios: diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index efc3fe15c57..44cb9192624 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -167,7 +167,7 @@ func respondingHTTPBodyChecks(body string) validator.Validator { func respondingHTTPHeaderChecks() validator.Validator { return lookslike.MustCompile(map[string]interface{}{ "http.response.headers": map[string]interface{}{ - "Date": isdef.IsString, + "Date": isdef.Optional(isdef.IsString), "Content-Length": isdef.Optional(isdef.IsString), "Content-Type": isdef.Optional(isdef.IsString), "Location": isdef.Optional(isdef.IsString), @@ -261,12 +261,12 @@ func TestUpStatuses(t *testing.T) { testslike.Test( t, - lookslike.Strict(lookslike.Compose( + lookslike.Compose( hbtest.BaseChecks("127.0.0.1", "up", "http"), hbtest.RespondingTCPChecks(), hbtest.SummaryChecks(1, 0), respondingHTTPChecks(server.URL, "text/plain; charset=utf-8", status), - )), + ), event.Fields, ) }) @@ -423,7 +423,7 @@ func TestJsonBody(t *testing.T) { if tc.expression != "" { jsonCheck["expression"] = tc.expression } - if tc.condition != nil { + if len(tc.condition) > 0 { jsonCheck["condition"] = tc.condition } @@ -687,7 +687,7 @@ func TestRedirect(t *testing.T) { testslike.Test( t, - lookslike.Strict(lookslike.Compose( + lookslike.Compose( hbtest.BaseChecks("", "up", "http"), hbtest.SummaryChecks(1, 0), minimalRespondingHTTPChecks(testURL, "text/plain; charset=utf-8", 200), @@ -701,7 +701,7 @@ func TestRedirect(t *testing.T) { server.URL + redirectingPaths["/redirect_two"], }, }), - )), + ), event.Fields, ) }