From 6b735016d9e70292ac13906abe572bb7af44528e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikl=C3=B3s=20F=C3=B6ld=C3=A9nyi?= Date: Mon, 23 Dec 2024 19:03:51 +0100 Subject: [PATCH] #7514 Fix channel deadlock in meta sync fetcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Errors no longer take out the thread with them, instead are collected into a multierror. Signed-off-by: Miklós Földényi --- CHANGELOG.md | 1 + go.mod | 3 ++ go.sum | 1 + pkg/block/fetcher.go | 35 +++++++++++++--- pkg/block/fetcher_test.go | 86 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 120 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9b7990e3c..e52517b385 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#7978](https://github.com/thanos-io/thanos/pull/7978) Receive: Fix deadlock during local writes when `split-tenant-label-name` is used +- [#7933](https://github.com/thanos-io/thanos/pull/7933) *: Fix channel deadlock in meta sync fetcher - [#8016](https://github.com/thanos-io/thanos/pull/8016) Query Frontend: Fix @ modifier not being applied correctly on sub queries. ### Added diff --git a/go.mod b/go.mod index dd5cd1ce44..6556817542 100644 --- a/go.mod +++ b/go.mod @@ -102,6 +102,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require github.com/hashicorp/go-multierror v1.1.1 + require ( cloud.google.com/go v0.115.1 // indirect cloud.google.com/go/auth v0.9.3 // indirect @@ -181,6 +183,7 @@ require ( github.com/gorilla/mux v1.8.0 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect github.com/jcchavezs/porto v0.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/go.sum b/go.sum index a0298af500..3c75873273 100644 --- a/go.sum +++ b/go.sum @@ -1866,6 +1866,7 @@ github.com/hashicorp/consul/api v1.29.4 h1:P6slzxDLBOxUSj3fWo2o65VuKtbtOXFi7TSSg github.com/hashicorp/consul/api v1.29.4/go.mod h1:HUlfw+l2Zy68ceJavv2zAyArl2fqhGWnMycyt56sBgg= github.com/hashicorp/cronexpr v1.1.2 h1:wG/ZYIKT+RT3QkOdgYc+xsKWVRgnxJ1OJtjjy84fJ9A= github.com/hashicorp/cronexpr v1.1.2/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 02e9dec513..d68b834098 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -15,6 +15,8 @@ import ( "sync" "time" + "github.com/hashicorp/go-multierror" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/golang/groupcache/singleflight" @@ -236,9 +238,10 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c partialBlocks = make(map[ulid.ULID]bool) var ( - metaChan = make(chan ulid.ULID, concurrency) - eg, gCtx = errgroup.WithContext(ctx) - mu sync.Mutex + metaChan = make(chan ulid.ULID, concurrency) + eg, gCtx = errgroup.WithContext(ctx) + mu, memu sync.Mutex + multiError error ) for i := 0; i < concurrency; i++ { eg.Go(func() error { @@ -247,9 +250,14 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. // TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work). metaFile := path.Join(uid.String(), MetaFilename) + ok, err := f.bkt.Exists(gCtx, metaFile) + if err != nil { - return errors.Wrapf(err, "meta.json file exists: %v", uid) + memu.Lock() + multiError = multierror.Append(multiError, errors.Wrapf(err, "meta.json file exists: %v", uid)) + memu.Unlock() + continue } if !ok { mu.Lock() @@ -283,9 +291,24 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c } close(metaChan) - if err := eg.Wait(); err != nil { - return nil, err + multiError = multierror.Append(multiError, eg.Wait()) + + if multiError != nil { + switch me := multiError.(type) { + case *multierror.Error: + // return the multierror if there are multiple errors wrapped + if len(me.Errors) > 1 { + return nil, multiError + } + // return singular unwrapped error + if len(me.Errors) > 0 { + return nil, me.Errors[0] + } + default: + return nil, multiError + } } + return partialBlocks, nil } diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 5e24e26538..e0e4518f93 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "fmt" + "io" "math/rand" "os" "path" @@ -18,11 +19,13 @@ import ( "time" "github.com/go-kit/log" + "github.com/hashicorp/go-multierror" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/objtesting" @@ -1211,3 +1214,86 @@ func Test_ParseRelabelConfig(t *testing.T) { testutil.NotOk(t, err) testutil.Equals(t, "unsupported relabel action: labelmap", err.Error()) } + +func Test_ConcurrentLister_channel_deadlock(t *testing.T) { + lister := ConcurrentLister{ + bkt: InstrumentedBucketReaderMock{}, + logger: nil, + } + + outputChannel := make(chan ulid.ULID) + defer close(outputChannel) + + timeout, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + _, err := lister.GetActiveAndPartialBlockIDs(timeout, outputChannel) + + multiError := err.(*multierror.Error) + + // in case of a deadlock we would not process all requests. + // The presence of all 129 errors means we processed them all without a deadlock + assert.Equal(t, len(multiError.Errors), 64+64+1) +} + +type InstrumentedBucketReaderMock struct{} + +func (InstrumentedBucketReaderMock) Iter(ctx context.Context, dir string, f func(name string) error, options ...objstore.IterOption) error { + // Concurrency is 64 and the queue has capacity of 64 + // Sending 64+64+1 ulids is enough + // 64 to terminate all 64 workers + // 64 more to fill the 64 capacity channel + // 1 extra for the channel writer to block on + for i := 1; i <= 129; i++ { + err := f(ULID(i).String()) + if err != nil { + return err + } + } + return nil +} + +func (InstrumentedBucketReaderMock) Exists(ctx context.Context, name string) (bool, error) { + // simulating an objstore error + return false, errors.New("Simulated") +} + +func (InstrumentedBucketReaderMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Close() error { panic("not required") } + +func (InstrumentedBucketReaderMock) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) SupportedIterOptions() []objstore.IterOptionType { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Get(ctx context.Context, name string) (io.ReadCloser, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) IsObjNotFoundErr(err error) bool { panic("not required") } + +func (InstrumentedBucketReaderMock) IsAccessDeniedErr(err error) bool { panic("not required") } + +func (InstrumentedBucketReaderMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Upload(ctx context.Context, name string, r io.Reader) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Delete(ctx context.Context, name string) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Name() string { panic("not required") }