Skip to content

Commit

Permalink
#7514 Fix channel deadlock in meta sync fetcher
Browse files Browse the repository at this point in the history
Errors no longer take out the thread with them, instead are collected into a multierror.

Signed-off-by: Miklós Földényi <[email protected]>
  • Loading branch information
mfoldenyi committed Jan 10, 2025
1 parent 4ba0ba4 commit 6b73501
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed

- [#7978](https://github.com/thanos-io/thanos/pull/7978) Receive: Fix deadlock during local writes when `split-tenant-label-name` is used
- [#7933](https://github.com/thanos-io/thanos/pull/7933) *: Fix channel deadlock in meta sync fetcher
- [#8016](https://github.com/thanos-io/thanos/pull/8016) Query Frontend: Fix @ modifier not being applied correctly on sub queries.

### Added
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require github.com/hashicorp/go-multierror v1.1.1

require (
cloud.google.com/go v0.115.1 // indirect
cloud.google.com/go/auth v0.9.3 // indirect
Expand Down Expand Up @@ -181,6 +183,7 @@ require (
github.com/gorilla/mux v1.8.0 // indirect
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect
github.com/jcchavezs/porto v0.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,7 @@ github.com/hashicorp/consul/api v1.29.4 h1:P6slzxDLBOxUSj3fWo2o65VuKtbtOXFi7TSSg
github.com/hashicorp/consul/api v1.29.4/go.mod h1:HUlfw+l2Zy68ceJavv2zAyArl2fqhGWnMycyt56sBgg=
github.com/hashicorp/cronexpr v1.1.2 h1:wG/ZYIKT+RT3QkOdgYc+xsKWVRgnxJ1OJtjjy84fJ9A=
github.com/hashicorp/cronexpr v1.1.2/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down
35 changes: 29 additions & 6 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"sync"
"time"

"github.com/hashicorp/go-multierror"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/groupcache/singleflight"
Expand Down Expand Up @@ -236,9 +238,10 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c

partialBlocks = make(map[ulid.ULID]bool)
var (
metaChan = make(chan ulid.ULID, concurrency)
eg, gCtx = errgroup.WithContext(ctx)
mu sync.Mutex
metaChan = make(chan ulid.ULID, concurrency)
eg, gCtx = errgroup.WithContext(ctx)
mu, memu sync.Mutex
multiError error
)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
Expand All @@ -247,9 +250,14 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c
// For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix.
// TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work).
metaFile := path.Join(uid.String(), MetaFilename)

ok, err := f.bkt.Exists(gCtx, metaFile)

if err != nil {
return errors.Wrapf(err, "meta.json file exists: %v", uid)
memu.Lock()
multiError = multierror.Append(multiError, errors.Wrapf(err, "meta.json file exists: %v", uid))
memu.Unlock()
continue
}
if !ok {
mu.Lock()
Expand Down Expand Up @@ -283,9 +291,24 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c
}
close(metaChan)

if err := eg.Wait(); err != nil {
return nil, err
multiError = multierror.Append(multiError, eg.Wait())

if multiError != nil {
switch me := multiError.(type) {
case *multierror.Error:
// return the multierror if there are multiple errors wrapped
if len(me.Errors) > 1 {
return nil, multiError
}
// return singular unwrapped error
if len(me.Errors) > 0 {
return nil, me.Errors[0]
}
default:
return nil, multiError
}
}

return partialBlocks, nil
}

Expand Down
86 changes: 86 additions & 0 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
"os"
"path"
Expand All @@ -18,11 +19,13 @@ import (
"time"

"github.com/go-kit/log"
"github.com/hashicorp/go-multierror"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/objtesting"

Expand Down Expand Up @@ -1211,3 +1214,86 @@ func Test_ParseRelabelConfig(t *testing.T) {
testutil.NotOk(t, err)
testutil.Equals(t, "unsupported relabel action: labelmap", err.Error())
}

func Test_ConcurrentLister_channel_deadlock(t *testing.T) {
lister := ConcurrentLister{
bkt: InstrumentedBucketReaderMock{},
logger: nil,
}

outputChannel := make(chan ulid.ULID)
defer close(outputChannel)

timeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

_, err := lister.GetActiveAndPartialBlockIDs(timeout, outputChannel)

multiError := err.(*multierror.Error)

// in case of a deadlock we would not process all requests.
// The presence of all 129 errors means we processed them all without a deadlock
assert.Equal(t, len(multiError.Errors), 64+64+1)
}

type InstrumentedBucketReaderMock struct{}

func (InstrumentedBucketReaderMock) Iter(ctx context.Context, dir string, f func(name string) error, options ...objstore.IterOption) error {
// Concurrency is 64 and the queue has capacity of 64
// Sending 64+64+1 ulids is enough
// 64 to terminate all 64 workers
// 64 more to fill the 64 capacity channel
// 1 extra for the channel writer to block on
for i := 1; i <= 129; i++ {
err := f(ULID(i).String())
if err != nil {
return err
}
}
return nil
}

func (InstrumentedBucketReaderMock) Exists(ctx context.Context, name string) (bool, error) {
// simulating an objstore error
return false, errors.New("Simulated")
}

func (InstrumentedBucketReaderMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
panic("not required")
}

func (InstrumentedBucketReaderMock) Close() error { panic("not required") }

func (InstrumentedBucketReaderMock) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
panic("not required")
}

func (InstrumentedBucketReaderMock) SupportedIterOptions() []objstore.IterOptionType {
panic("not required")
}

func (InstrumentedBucketReaderMock) Get(ctx context.Context, name string) (io.ReadCloser, error) {
panic("not required")
}

func (InstrumentedBucketReaderMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
panic("not required")
}

func (InstrumentedBucketReaderMock) IsObjNotFoundErr(err error) bool { panic("not required") }

func (InstrumentedBucketReaderMock) IsAccessDeniedErr(err error) bool { panic("not required") }

func (InstrumentedBucketReaderMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
panic("not required")
}

func (InstrumentedBucketReaderMock) Upload(ctx context.Context, name string, r io.Reader) error {
panic("not required")
}

func (InstrumentedBucketReaderMock) Delete(ctx context.Context, name string) error {
panic("not required")
}

func (InstrumentedBucketReaderMock) Name() string { panic("not required") }

0 comments on commit 6b73501

Please sign in to comment.