diff --git a/.github/workflows/check-default.yml b/.github/workflows/check-default.yml index f394815356c4..9edcf7c2654f 100644 --- a/.github/workflows/check-default.yml +++ b/.github/workflows/check-default.yml @@ -20,6 +20,11 @@ jobs: - uses: actions/setup-go@v5 with: go-version-file: .go-version + #  when using ubuntu-latest, python 3.10 is not the default version. + - name: Fix Code is not compatible with Python 3.12 + uses: actions/setup-python@v4 + with: + python-version: '3.10' - name: Run check-default run: | go install github.com/magefile/mage diff --git a/.github/workflows/check-docs.yml b/.github/workflows/check-docs.yml index a2f26979ec40..50ef425ae33a 100644 --- a/.github/workflows/check-docs.yml +++ b/.github/workflows/check-docs.yml @@ -28,6 +28,11 @@ jobs: run: sudo apt-get install -y libsystemd-dev - name: Install librpm-dev run: sudo apt-get install -y librpm-dev + #  when using ubuntu-latest, python 3.10 is not the default version. + - name: Fix Code is not compatible with Python 3.12 + uses: actions/setup-python@v4 + with: + python-version: '3.10' - name: Run check run: | make check diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4ba86d64d778..f215e8bbd6ab 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -204,6 +204,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019] - The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] - Fix Netflow Template Sharing configuration handling. {pull}42080[42080] +- Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218] *Heartbeat* @@ -237,6 +238,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Do not report non-existant 0 values for RSS metrics in docker/memory {pull}41449[41449] - Log Cisco Meraki `getDevicePerformanceScores` errors without stopping metrics collection. {pull}41622[41622] - Don't skip first bucket value in GCP metrics metricset for distribution type metrics {pull}41822[41822] +- [K8s Integration] Enhance HTTP authentication in case of token updates for Apiserver, Controllermanager and Scheduler metricsets {issue}41910[41910] {pull}42016[42016] - Fixed `creation_date` scientific notation output in the `elasticsearch.index` metricset. {pull}42053[42053] - Fix bug where metricbeat unintentionally triggers Windows ASR. {pull}42177[42177] @@ -382,6 +384,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] - Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094] +- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225] *Auditbeat* diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go index f4e06df7e1f1..50929c128c37 100644 --- a/metricbeat/helper/prometheus/prometheus.go +++ b/metricbeat/helper/prometheus/prometheus.go @@ -35,6 +35,9 @@ const acceptHeader = `text/plain;version=0.0.4;q=0.5,*/*;q=0.1` // Prometheus helper retrieves prometheus formatted metrics type Prometheus interface { + // GetHttp returns the HTTP Client that handles the connection towards remote endpoint + GetHttp() (*helper.HTTP, error) + // GetFamilies requests metric families from prometheus endpoint and returns them GetFamilies() ([]*MetricFamily, error) @@ -66,6 +69,15 @@ func NewPrometheusClient(base mb.BaseMetricSet) (Prometheus, error) { return &prometheus{http, base.Logger()}, nil } +// GetHttp returns HTTP Client +func (p *prometheus) GetHttp() (*helper.HTTP, error) { + httpClient, ok := p.httpfetcher.(*helper.HTTP) + if !ok { + return nil, fmt.Errorf("httpfetcher is not of type *helper.HTTP") + } + return httpClient, nil +} + // GetFamilies requests metric families from prometheus endpoint and returns them func (p *prometheus) GetFamilies() ([]*MetricFamily, error) { var reader io.Reader diff --git a/metricbeat/module/kubernetes/apiserver/metricset.go b/metricbeat/module/kubernetes/apiserver/metricset.go index 9dd9a81976d2..5457093e5536 100644 --- a/metricbeat/module/kubernetes/apiserver/metricset.go +++ b/metricbeat/module/kubernetes/apiserver/metricset.go @@ -19,9 +19,14 @@ package apiserver import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -29,9 +34,11 @@ import ( // Metricset for apiserver is a prometheus based metricset type Metricset struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } var _ mb.ReportingMetricSetV2Error = (*Metricset)(nil) @@ -41,11 +48,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &Metricset{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil @@ -54,20 +73,36 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *Metricset) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) - } - isOpen := reporter.Event(event) - if !isOpen { - return nil + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } + return nil } - - return nil } diff --git a/metricbeat/module/kubernetes/controllermanager/controllermanager.go b/metricbeat/module/kubernetes/controllermanager/controllermanager.go index dbfcddc2b6be..6c7b1c8ae528 100644 --- a/metricbeat/module/kubernetes/controllermanager/controllermanager.go +++ b/metricbeat/module/kubernetes/controllermanager/controllermanager.go @@ -19,9 +19,14 @@ package controllermanager import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -74,9 +79,11 @@ func init() { // MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling. type MetricSet struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } // New create a new instance of the MetricSet @@ -87,11 +94,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &MetricSet{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil } @@ -99,19 +118,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) - } - isOpen := reporter.Event(event) - if !isOpen { - return nil + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } - } - return nil + return nil + } } diff --git a/metricbeat/module/kubernetes/scheduler/scheduler.go b/metricbeat/module/kubernetes/scheduler/scheduler.go index f512c96b7f2f..1b563ad000af 100644 --- a/metricbeat/module/kubernetes/scheduler/scheduler.go +++ b/metricbeat/module/kubernetes/scheduler/scheduler.go @@ -19,9 +19,14 @@ package scheduler import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -78,9 +83,11 @@ func init() { // MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling. type MetricSet struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } // New create a new instance of the MetricSet @@ -91,11 +98,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &MetricSet{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil } @@ -103,20 +122,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } - isOpen := reporter.Event(event) - if !isOpen { - return nil - } - } - return nil + return nil + } } diff --git a/metricbeat/module/linux/rapl/msr_test b/metricbeat/module/linux/rapl/msr_test deleted file mode 100755 index 4e5a0cb83bbf..000000000000 Binary files a/metricbeat/module/linux/rapl/msr_test and /dev/null differ diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index 7f07fb4954f6..85a7c02467af 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -316,7 +316,7 @@ This specifies whether fields should be replaced with a `*` or deleted entirely [float] ==== `retry` -The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. +The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. It also supports blanket retries and infinite retries via the `blanket_retires` and `infinite_retries` configuration options. These are set to `false` by default. ["source","yaml",subs="attributes"] ---- @@ -333,6 +333,8 @@ filebeat.inputs: max_attempts: 5 wait_min: 1s wait_max: 10s + blanket_retries: false + infinite_retries: false ---- [float] ==== `retry.max_attempts` @@ -349,6 +351,16 @@ The minimum time to wait between retries. This ensures that retries are spaced o The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. The default value is `30` seconds. +[float] +==== `retry.blanket_retries` + +Normally the input will only retry when a connection error is found to be retryable based on the error type and the RFC 6455 error codes defined by the websocket protocol. If `blanket_retries` is set to `true` (`false` by default) the input will retry on any error. This is not recommended unless the user is certain that all errors are transient and can be resolved by retrying. + +[float] +==== `retry.infinite_retries` + +Normally the input will only retry a maximum of `max_attempts` times. If `infinite_retries` is set to `true` (`false` by default) the input will retry indefinitely. This is not recommended unless the user is certain that the connection will eventually succeed. + [float] === `timeout` Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds. diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index eea8c2afc704..df557d553de2 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -59,9 +59,11 @@ type redact struct { } type retry struct { - MaxAttempts int `config:"max_attempts"` - WaitMin time.Duration `config:"wait_min"` - WaitMax time.Duration `config:"wait_max"` + MaxAttempts int `config:"max_attempts"` + WaitMin time.Duration `config:"wait_min"` + WaitMax time.Duration `config:"wait_max"` + BlanketRetries bool `config:"blanket_retries"` + InfiniteRetries bool `config:"infinite_retries"` } type authConfig struct { @@ -136,7 +138,7 @@ func (c config) Validate() error { if c.Retry != nil { switch { - case c.Retry.MaxAttempts <= 0: + case c.Retry.MaxAttempts <= 0 && !c.Retry.InfiniteRetries: return errors.New("max_attempts must be greater than zero") case c.Retry.WaitMin > c.Retry.WaitMax: return errors.New("wait_min must be less than or equal to wait_max") diff --git a/x-pack/filebeat/input/streaming/config_test.go b/x-pack/filebeat/input/streaming/config_test.go index 840c35d400fa..437267bc7b71 100644 --- a/x-pack/filebeat/input/streaming/config_test.go +++ b/x-pack/filebeat/input/streaming/config_test.go @@ -130,6 +130,18 @@ var configTests = []struct { "url": "wss://localhost:443/v1/stream", }, }, + { + name: "valid_retry_with_infinite", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "infinite_retries": true, + "max_attempts": 0, + "wait_min": "1s", + "wait_max": "2s", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, } func TestConfig(t *testing.T) { diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index c11784ea3dbf..e4a8eac1d417 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -450,7 +450,7 @@ var inputTests = []struct { "wait_max": "2s", }, }, - wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"), + wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake and (status 403)"), }, { name: "single_event_tls", diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 225bc76e8d9f..584852aabcce 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -118,7 +118,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { _, message, err := c.ReadMessage() if err != nil { s.metrics.errorsTotal.Inc() - if !isRetryableError(err) { + if !s.cfg.Retry.BlanketRetries && !isRetryableError(err) { s.log.Errorw("failed to read websocket data", "error", err) return err } @@ -176,6 +176,9 @@ func isRetryableError(err error) bool { websocket.CloseInternalServerErr, websocket.CloseTryAgainLater, websocket.CloseServiceRestart, + websocket.CloseAbnormalClosure, + websocket.CloseMessageTooBig, + websocket.CloseNoStatusReceived, websocket.CloseTLSHandshake: return true } @@ -230,21 +233,38 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log } if cfg.Retry != nil { retryConfig := cfg.Retry - for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { - conn, response, err = dialer.DialContext(ctx, url, headers) - if err == nil { - return conn, response, nil + if !retryConfig.InfiniteRetries { + for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { + conn, response, err = dialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + //nolint:errorlint // it will never be a wrapped error at this point + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + } else { + log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode) + } + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) } - //nolint:errorlint // it will never be a wrapped error at this point - if err == websocket.ErrBadHandshake { - log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) - continue + return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w and (status %d)", retryConfig.MaxAttempts, err, response.StatusCode) + } else { + for attempt := 1; ; attempt++ { + conn, response, err = dialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + //nolint:errorlint // it will never be a wrapped error at this point + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + } else { + log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode) + } + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) } - log.Debugf("attempt %d: webSocket connection failed. retrying...\n", attempt) - waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) - time.Sleep(waitTime) } - return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err) } return dialer.DialContext(ctx, url, headers)