-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resort store response set on internal label dedup #6317
Resort store response set on internal label dedup #6317
Conversation
Hello 👋 Looks like there was no activity on this amazing PR for the last 30 days. |
e0a0529
to
4f3c8ad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a lot of reasoning kind of hidden behind the "bloom filter" name.
We are adding it in many different places, including the gRPC definition of the Store API, and its purpose is still not clear. On top of this, it also couples the Store API definition to one specific implementation (bloom filter) of "a way to do this thing".
I think this happens because it is named after what is it and not what it does. In Go, we kind of don't need to name things after what they are -- their type will tell us this.
But what is this bloom filter used for? Seems like is doing a "quick check" for whether a series request includes internal replica labels. So potentially we can name this better? Maybe "internal label checker" seems like a good proposal. One day we might switch from a bloom filter to something else, or add another mechanism (i.e. what if I wanted to store this information within a Redis or Memcache instance?) and things would be more extensible.
Now on a more technical question: why do we need to transfer the bloom filter over gRPC? Storing on Redis with the client-side cache features we get from the rueidis lib seem like a nice approach too.
pkg/store/proxy_heap.go
Outdated
var labelsToRemove map[string]struct{} | ||
if !st.SupportsWithoutReplicaLabels() && len(req.WithoutReplicaLabels) > 0 { | ||
labelsToRemove := make(map[string]struct{}) | ||
dedupByInternalLabel := hasInternalReplicaLabels(st, req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put this behind a feature flag? For Cortex, this code path seems an unnecessary overhead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does cortex use the proxy_heap
? But yes, we should add a FF in the bucket store
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah proxy heap will be used by store gateway for lazy series? So it is always used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, that's correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a feature flag in fpetkovski#5
Was talking with @saswatamcode earlier today and I was wondering whether we could add some metrics to the global sorting and bloom filter to try to quantify how many times a global sort is being executed or skipped. Potentially could also measure how long the bloom filter update is taking. What do you think? |
Sounds like a good idea 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small nits for cleanliness but otherwise thanks for doing this :) i'm excited about what we can do wtih this bloomfilter now it's there
test/e2e/query_test.go
Outdated
// This test is expected to fail until the bug outlined in https://github.com/thanos-io/thanos/issues/6257 | ||
// is fixed. This means that it will return double the expected series until then. | ||
// This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. | ||
// Until the bug was fixed, this testcase would return double the expected series. | ||
expectedDedupBug: true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be cleaning up this bool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great found, sir. We should get rid of this bool.
test/e2e/receive_test.go
Outdated
// This should've returned only 2 series, but is returning 4 until the problem reported in | ||
// https://github.com/thanos-io/thanos/issues/6257 is fixed | ||
// This is a regression test for the bug outlined in https://github.com/thanos-io/thanos/issues/6257. | ||
// Until the bug was fixed, this testcase would return 4 series instead of 2. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Until the bug was fixed, this testcase would return 4 series instead of 2.
nit: Unnecessary clarification.
cmd/thanos/query.go
Outdated
return &infopb.StoreInfo{ | ||
MinTime: mint, | ||
MaxTime: maxt, | ||
SupportsSharding: true, | ||
SupportsWithoutReplicaLabels: true, | ||
TsdbInfos: proxy.TSDBInfos(), | ||
LabelNamesBloom: infopb.NewBloomFilter(labelNamesBloom), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: LabelNamesBloom does not elude to what it's actually used for. Type can be inferred, should instead allude to what it's used for. In this case 'indexedLabels' for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in this case, the store info is a data transfer struct, so I think it's fine to keep the Bloom suffix. I think it's important to know what those bytes actually are. The same way we have MinTime
and MaxTime
cmd/thanos/query.go
Outdated
// Start bloom name filter updater. | ||
{ | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
level.Debug(logger).Log("msg", "setting up periodic label names bloom filter update") | ||
g.Add(func() error { | ||
return runutil.Repeat(10*time.Second, ctx.Done(), func() error { | ||
level.Debug(logger).Log("msg", "Starting label names bloom filter update") | ||
|
||
if err := proxy.UpdateLabelNamesBloom(ctx); err != nil { | ||
return err | ||
} | ||
|
||
level.Debug(logger).Log("msg", "Finished label names bloom filter update") | ||
return nil | ||
}) | ||
}, func(err error) { | ||
cancel() | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Might be cleaner have goroutine errors/invocation handled in a different object (e.g. StoreLabelIndexer
) that takes a store Client
and invokes UpdateLabelNamesBloom
at some refresh interval. We need to do this for every store, and doing it inline is a bit of a refactoring nightmare.
pkg/bloom/bloom.go
Outdated
"github.com/bits-and-blooms/bloom" | ||
) | ||
|
||
const FilterErrorRate = 0.01 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be good to add some clarification on what this error rate represents (margin of error that bloom filter will return false for a value that it does contain)
labelSetFunc func() []labelpb.ZLabelSet | ||
timeRangeFunc func() (int64, int64) | ||
tsdbOpts *tsdb.Options | ||
store *store.TSDBStore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice cleanup <3
pkg/store/bucket.go
Outdated
bmtx sync.Mutex | ||
labelNamesBloom bloom.Filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
superNit: Doesn't seem like the right place for the mutex to be managed, perhaps this should be moved into LabelNamesBloom
for cleaner concurrency safeness.
pkg/store/bucket.go
Outdated
mtx.Lock() | ||
for _, n := range result { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
superNit: Another argument for moving bmtx
is that we're juggling more than one mutex in this func, would reduce mental load reading if we didn't have to reason about concurrency in multiple dimensions :) (supernit for a reason
)
pkg/store/bucket.go
Outdated
g, _ := errgroup.WithContext(ctx) | ||
|
||
var mtx sync.Mutex | ||
names := make(map[string]struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we're using a map instead of struct if we never populate struct is always empty?
032e64b
to
a29cc58
Compare
61a00d7
to
341e874
Compare
When deduplicating on labels which are stored internally in TSDB, the store response set needs to be resorted after replica labels are removed. In order to detect when deduplication by internal labels happens, this PR adds a bloom filter with all label names to the Info response. When a replica label is present in this bloom filter for an individual store, the proxy heap would resort a response set from that store before merging in the result with the rest of the set. Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
4fb3558
to
0ea795e
Compare
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
@GiedriusS @saswatamcode @douglascamata @moadz I have modified this PR to use a cuckoo filter and resort the series response in the store itself. Please take another look, the implementation is now simpler since we dont have to send additional data to the querier. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This looks good to me! Let's get this merged and release v0.32! 🙂
} | ||
|
||
func NewFromStrings(items ...string) Set { | ||
f := cuckoo.NewFilter(uint(len(items))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could estimate the size of the underlying slices as per the comments here https://github.com/seiflotfy/cuckoofilter/blob/master/cuckoofilter.go#L21-L26 and add some way of adding an upper limit for this, something like maybe 5MB
by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, what action do we take if the limit is exceeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a good choice would be to print a warning message and then always force sorting? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if that is a good tradeoff though, because 5MB is a fairly low price to pay compared to the increase in memory required for buffering and resorting series. If 1000000
is ~1MB, I think it will be very hard to have so many label names for memory of the filter to be a problem. Maybe we should check behavior in production before we add limits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very elegant implementation, thanks for this!
I think it's worth the try. A little bit concerned on the overhead for the whole system (generally should be insignificant, but we have to try to know). I also wonder how system handles eventual consistency (I assume we give inaccurate query results?). Perhaps there is a benefit to turn on/off this filtering system on demand?
// Start bloom name filter updater. | ||
{ | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
level.Debug(logger).Log("msg", "setting up periodic update for label names") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
level.Debug(logger).Log("msg", "setting up periodic update for label names") | |
level.Info(logger).Log("msg", "setting up periodic update for label names") |
I guess info would make sense here
As for consistency it's the same like with a sharded Thanos Store - blocks are not loaded at the same time on all nodes. #6317 (comment) perhaps the limit flag could serve as a way to disable this i.e. setting to 0 would disable this functionality on a node and show that all label names are available. Not in all setups reading data from remote object storage costs. And also I would be against removing the optimizations since they cut down query duration by 30%-40%. |
I think in case of lack of consistency, or even a false positive, what happens is a global resort. Am I right? |
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
@GiedriusS happy to hear I didn't break Thanos for nothing =DDDDD Coincidentally with this implementation we go into Monarch design even more (public info: https://www.vldb.org/pvldb/vol13/p3181-adams.pdf, and yes, I'm biased 🙃). Essentially Monarch has Field Hints, (so kind of our labels) that it updates and consult on every query 🙈 So... let's double check consistency issue, otherwise LGTM (: |
Signed-off-by: Filip Petkovski <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 let's see how much RAM the filters will need
@@ -1240,7 +1253,9 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill | |||
} | |||
|
|||
// Series implements the storepb.StoreServer interface. | |||
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { | |||
func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { | |||
srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a yet another way of performing a race inside of the storegateway but let's work on fixing this now before the release 👍 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually might be wrong because the Close() functions will still run at the same time 🤔 let's merge this and test it
Signed-off-by: Filip Petkovski <[email protected]>
Ok, time to try it out :) |
Awesome! So happy to see this merged. Thanks a lot, folks! 🙇 |
Btw, small nit: next time we need to remember to squash and merge. 😅 |
When deduplicating on labels which are stored internally in TSDB, the store response set needs to be resorted after replica labels are removed.
In order to detect when deduplication by internal labels happens, this PR adds a cuckoo filter with all label names to all store implementations. When a replica label is present in this filter, the store will resort the Series response set before returning it to the querier.
Fixes #6257.
Closes #6296.
Changes
Verification