Skip to content

Commit

Permalink
Fix for consistent error handling of consequetive errors
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed Jul 15, 2024
1 parent da9a2a4 commit e642282
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
45 changes: 35 additions & 10 deletions tempodb/blocklist/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,15 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {
blocklist = PerTenant{}
compactedBlocklist = PerTenantCompacted{}

finalErr atomic.Error
consecutiveErrors int
finalErr atomic.Error
pollErrs = make([]error, len(tenants))
)

for _, tenantID := range tenants {
// The finalErr is only stored when the number of consecutive errors exceeds the tolerance. If present, return it.
for i, tenantID := range tenants {
// The finalErr is only stored when the number of consecutive errors
// exceeds the tolerance. If present, return it.
if err := finalErr.Load(); err != nil {
level.Error(p.logger).Log("msg", "exiting polling loop early because too many errors", "errCount", consecutiveErrors)
level.Error(p.logger).Log("msg", "exiting polling loop early because too many errors")
return nil, nil, err
}

Expand All @@ -177,17 +178,21 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {
defer mtx.Unlock()
if err != nil {
level.Error(p.logger).Log("msg", "failed to poll or create index for tenant", "tenant", tenantID, "err", err)
consecutiveErrors++
blocklist[tenantID] = previous.Metas(tenantID)
compactedBlocklist[tenantID] = previous.CompactedMetas(tenantID)
if consecutiveErrors > p.cfg.TolerateConsecutiveErrors {
finalErr.Store(err)

// Store the error in the slice at the position of this tenant. This
// allows us to check for consecutive errors.
pollErrs[i] = err
pollErr := pollError(pollErrs, p.cfg.TolerateConsecutiveErrors)
if pollErr != nil {
finalErr.Store(pollErr)
return
}

return
}

consecutiveErrors = 0
if len(newBlockList) > 0 || len(newCompactedBlockList) > 0 {
blocklist[tenantID] = newBlockList
compactedBlocklist[tenantID] = newCompactedBlockList
Expand All @@ -210,7 +215,6 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {

wg.Wait()

// Load the finalErr in case the error was on the last itteration of the tenant loop.
if err := finalErr.Load(); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -586,3 +590,24 @@ func sumTotalBackendMetaMetrics(
compactedBlockMetaTotalBytes: sumTotalBytesCBM,
}
}

// pollError checks the slice of errors to determine if there is a sequence of
// non-nil values, and returns the last one in sequence if found, or nil if no
// sequence exceeeds the max. Called under lock when concurrent.
func pollError(errs []error, maxConsecutiveErrors int) error {
max := 0

for i, e := range errs {
if e != nil {
max++
} else {
max = 0
}

if max > maxConsecutiveErrors {
return errs[i]
}
}

return nil
}
11 changes: 4 additions & 7 deletions tempodb/blocklist/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"maps"
"math/rand"
"regexp"
"sort"
"strconv"
"testing"
Expand Down Expand Up @@ -527,7 +526,7 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) {
name string
tolerate int
tenantErrors []error
expectedError *regexp.Regexp
expectedError error
}{
{
name: "no errors",
Expand All @@ -539,7 +538,7 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) {
name: "untolerated single error",
tolerate: 0,
tenantErrors: []error{nil, errors.New("tenant 1 err"), nil},
expectedError: regexp.MustCompile(`tenant 1 err`),
expectedError: errors.New("tenant 1 err"),
},
{
name: "tolerated errors",
Expand All @@ -557,7 +556,7 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) {
errors.New("tenant 3 err"),
nil,
},
expectedError: regexp.MustCompile(`tenant \d err`),
expectedError: errors.New("tenant 3 err"),
},
}

Expand Down Expand Up @@ -589,9 +588,7 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) {
if tc.expectedError != nil {
// We expect an error, but we don't know which one, so match the regex
// against the error.
require.NotNil(t, err)
matches := tc.expectedError.FindAllString(err.Error(), -1)
require.Greater(t, len(matches), 0)
assert.ErrorContains(t, err, tc.expectedError.Error())
} else {
assert.NoError(t, err)
}
Expand Down

0 comments on commit e642282

Please sign in to comment.