Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Adjust State loader to only retry for failed requests and not for 4xx #37981

Merged
merged 11 commits into from
Mar 4, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ fields added to events containing the Beats version. {pull}37553[37553]

*Heartbeat*

- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702]
- Fix setuid root when running under cgroups v2. {pull}37794[37794]
- Adjust State loader to only retry when response code status is 5xx {pull}37981[37981]

*Metricbeat*

Expand Down
22 changes: 18 additions & 4 deletions heartbeat/monitors/wrappers/monitorstate/esloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@ import (

var DefaultDataStreams = "synthetics-*,heartbeat-*"

type LoaderError struct {
err error
Retry bool
}

func (e LoaderError) Error() string {
return e.err.Error()
}

func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation *config.LocationWithID) StateLoader {
if indexPattern == "" {
// Should never happen, but if we ever make a coding error...
logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES")
return NilStateLoader
}

return func(sf stdfields.StdMonitorFields) (*State, error) {
var runFromID string
if sf.RunFrom != nil {
Expand Down Expand Up @@ -74,10 +82,11 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
},
},
}

status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody)
if err != nil || status > 299 {
return nil, fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err)
sErr := fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err)
retry := shouldRetry(status)
return nil, LoaderError{err: sErr, Retry: retry}
}

type stateHits struct {
Expand All @@ -94,7 +103,8 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
sh := stateHits{}
err = json.Unmarshal(body, &sh)
if err != nil {
return nil, fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err)
sErr := fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err)
return nil, LoaderError{err: sErr, Retry: false}
}

if len(sh.Hits.Hits) == 0 {
Expand All @@ -107,3 +117,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
return state, nil
}
}

func shouldRetry(status int) bool {
return status >= 500
}
63 changes: 61 additions & 2 deletions heartbeat/monitors/wrappers/monitorstate/esloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ package monitorstate

import (
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"

Expand All @@ -33,6 +36,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/heartbeat/esutil"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/processors/util"
)

Expand All @@ -51,7 +55,7 @@ func TestStatesESLoader(t *testing.T) {

monID := etc.createTestMonitorStateInES(t, testStatus)
// Since we've continued this state it should register the initial state
ms := etc.tracker.GetCurrentState(monID)
ms := etc.tracker.GetCurrentState(monID, RetryConfig{})
require.True(t, ms.StartedAt.After(testStart.Add(-time.Nanosecond)), "timestamp for new state is off")
requireMSStatusCount(t, ms, testStatus, 1)

Expand Down Expand Up @@ -89,8 +93,61 @@ func TestStatesESLoader(t *testing.T) {
}
}

func TestMakeESLoaderError(t *testing.T) {
tests := []struct {
name string
statusCode int
expected bool
}{
{
name: "should return a retryable error",
statusCode: http.StatusInternalServerError,
expected: true,
},
{
name: "should not return a retryable error",
statusCode: http.StatusNotFound,
expected: false,
},
{
name: "should not return a retryable error when handling malformed data",
statusCode: http.StatusOK,
expected: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
etc := newESTestContext(t)
etc.ec.HTTP = fakeHTTPClient{respStatus: test.statusCode}
loader := MakeESLoader(etc.ec, "fakeIndexPattern", etc.location)

_, err := loader(stdfields.StdMonitorFields{})

var loaderError LoaderError
require.ErrorAs(t, err, &loaderError)
require.Equal(t, loaderError.Retry, test.expected)
})
}
}

type fakeHTTPClient struct {
respStatus int
}

func (fc fakeHTTPClient) Do(req *http.Request) (resp *http.Response, err error) {
return &http.Response{
StatusCode: fc.respStatus,
Body: io.NopCloser(strings.NewReader("test response")),
}, nil
}

func (fc fakeHTTPClient) CloseIdleConnections() {
// noop
}

type esTestContext struct {
namespace string
ec *eslegclient.Connection
esc *elasticsearch.Client
loader StateLoader
tracker *Tracker
Expand All @@ -106,10 +163,12 @@ func newESTestContext(t *testing.T) *esTestContext {
}
namespace, _ := uuid.NewV4()
esc := IntegApiClient(t)
ec := IntegES(t)
etc := &esTestContext{
namespace: namespace.String(),
esc: esc,
loader: IntegESLoader(t, fmt.Sprintf("synthetics-*-%s", namespace.String()), location),
ec: ec,
Copy link
Contributor Author

@devcorpio devcorpio Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change made to make the HTTP API obj "fakeable" as you can see in esloader_test.go

loader: IntegESLoader(t, ec, fmt.Sprintf("synthetics-*-%s", namespace.String()), location),
location: location,
}

Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/wrappers/monitorstate/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (

// Helpers for tests here and elsewhere

func IntegESLoader(t *testing.T, indexPattern string, location *config.LocationWithID) StateLoader {
return MakeESLoader(IntegES(t), indexPattern, location)
func IntegESLoader(t *testing.T, esc *eslegclient.Connection, indexPattern string, location *config.LocationWithID) StateLoader {
return MakeESLoader(esc, indexPattern, location)
}

func IntegES(t *testing.T) (esc *eslegclient.Connection) {
Expand Down
34 changes: 27 additions & 7 deletions heartbeat/monitors/wrappers/monitorstate/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
t.mtx.Lock()
defer t.mtx.Unlock()

state := t.GetCurrentState(sf)
state := t.GetCurrentState(sf, RetryConfig{})
if state == nil {
state = newMonitorState(sf, newStatus, 0, t.flappingEnabled)
logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String())
Expand All @@ -75,36 +75,56 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
}

func (t *Tracker) GetCurrentStatus(sf stdfields.StdMonitorFields) StateStatus {
s := t.GetCurrentState(sf)
s := t.GetCurrentState(sf, RetryConfig{})
if s == nil {
return StatusEmpty
}
return s.Status
}

func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) {
type RetryConfig struct {
attempts int
waitFn func() time.Duration
}

func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) (state *State) {
if state, ok := t.states[sf.ID]; ok {
return state
}

tries := 3
// Default number of attempts
attempts := 3
if rc.attempts != 0 {
attempts = rc.attempts
}

var loadedState *State
var err error
for i := 0; i < tries; i++ {
var i int
for i = 0; i < attempts; i++ {
loadedState, err = t.stateLoader(sf)
if err == nil {
if loadedState != nil {
logp.L().Infof("loaded previous state for monitor %s: %s", sf.ID, loadedState.String())
}
break
}
var loaderError LoaderError
if errors.As(err, &loaderError) && !loaderError.Retry {
logp.L().Warnf("could not load last externally recorded state: %v", loaderError)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess if we want to use %w, we would need to unwrap errors.Unwrap. Not super sure. But %v should called the Error() method.

break
}

// Default sleep time
sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond)
logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err)
if rc.waitFn != nil {
sleepFor = rc.waitFn()
}
logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %v", sleepFor.Milliseconds(), err)
time.Sleep(sleepFor)
}
if err != nil {
logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", tries, sf.ID)
logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", i+1, sf.ID)
}

if loadedState != nil {
Expand Down
49 changes: 49 additions & 0 deletions heartbeat/monitors/wrappers/monitorstate/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monitorstate

import (
"errors"
"testing"
"time"

Expand Down Expand Up @@ -131,3 +132,51 @@ func TestDeferredStateLoader(t *testing.T) {
resState, _ = dsl(stdfields.StdMonitorFields{})
require.Equal(t, stateA, resState)
}

func TestStateLoaderRetry(t *testing.T) {
// While testing the sleep time between retries should be negligible
waitFn := func() time.Duration {
return time.Microsecond
}

tests := []struct {
name string
retryable bool
rc RetryConfig
expectedCalls int
}{
{
"should retry 3 times when fails with retryable error",
Copy link
Contributor Author

@devcorpio devcorpio Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test lasts 5 seconds given that time.Sleep is executed for every retry. Let me know if you want me to make this parameterizable or if you believe it's something negligible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should make it configurable.

true,
RetryConfig{waitFn: waitFn},
3,
},
{
"should not retry when fails with non-retryable error",
false,
RetryConfig{waitFn: waitFn},
1,
},
{
"should honour the configured number of attempts when fails with retryable error",
true,
RetryConfig{attempts: 5, waitFn: waitFn},
5,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
calls := 0
errorStateLoader := func(_ stdfields.StdMonitorFields) (*State, error) {
calls += 1
return nil, LoaderError{err: errors.New("test error"), Retry: tt.retryable}
}

mst := NewTracker(errorStateLoader, true)
mst.GetCurrentState(stdfields.StdMonitorFields{}, tt.rc)

require.Equal(t, calls, tt.expectedCalls)
})
}
}
Loading