-
Notifications
You must be signed in to change notification settings - Fork 456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use upsert behavior for datapoints written to the mutable series buffer #876
Conversation
Codecov Report
@@ Coverage Diff @@
## master #876 +/- ##
==========================================
+ Coverage 78.67% 78.71% +0.04%
==========================================
Files 396 397 +1
Lines 33533 33678 +145
==========================================
+ Hits 26382 26511 +129
- Misses 5354 5359 +5
- Partials 1797 1808 +11
Continue to review full report at Codecov.
|
Maybe its too much of a pain, but how do you feel about making the conflict resolution strategy configurable? The main reason being:
|
…s, ensure writing same value doesn't cause new encoder to be created
@richardartoul all feedback addressed, if you write the same value to same timestamp multiple times it will be a no-op for creating buffers, and you can control how it selects different datapoints when different values for the same timestamp are being iterated over. |
|
||
switch i.equalTimesStrategy { | ||
case IterateHighestValue: | ||
sort.Slice(i.earliest, func(a, b int) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this gonna alloc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thankfully no, output of go build -gcflags "-m" .
in this package:
./iterators.go:63:26: (*iterators).current func literal does not escape
./iterators.go:70:26: (*iterators).current func literal does not escape
./iterators.go:87:26: (*iterators).current func literal does not escape
@@ -0,0 +1,156 @@ | |||
// Copyright (c) 2017 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2018
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ta, will fix.
|
||
package encoding | ||
|
||
import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this file really not have any tests before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, it was covered by multi reader iterator tests before, since this file was split as a subtype of multi reader iterator when series iterator needed the same type of logic.
|
||
const ( | ||
// IterateLastPushed is useful for within a single replica, using the last | ||
// immutable buffer that was created to decide which value to choose. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The iterators have to be given in the correct order though right? Maybe clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
|
||
// DefaultIterateEqualTimestampStrategy is the default iterate | ||
// equal timestamp strategy. | ||
DefaultIterateEqualTimestampStrategy = IterateEqualTimestampStrategy(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you do DefaultIterateEqualTimestampStrategy = IterateLastPushed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
// Return a copy here so callers cannot mutate the known list. | ||
result := make([]IterateEqualTimestampStrategy, 0, | ||
len(validIterateEqualTimestampStrategies)) | ||
copy(result, validIterateEqualTimestampStrategies) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this broken? my understanding is that copy
will copy min(dst, src)
and len(dst)
here is zero
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, good call. I'll add a test for this too.
yaml "gopkg.in/yaml.v2" | ||
) | ||
|
||
func TestValidIterateEqualTimestampStrategies(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If my previous comment is right, you might want to add a length test here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a values check, in addition to the ptr mismatch test.
@@ -101,10 +97,17 @@ func (it *seriesIterator) Close() { | |||
return | |||
} | |||
it.closed = true | |||
it.id.Finalize() | |||
it.nsID.Finalize() | |||
if it.id != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What changed that we suddenly needed these nil checks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No new change, I'm just making sure we do the same nil check for all fields that are nil-able here.
it.iters.reset() | ||
it.iters.setFilter(startInclusive, endExclusive) | ||
if !it.start.IsZero() && !it.end.IsZero() { | ||
it.iters.setFilter(it.start, it.end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this do the right thing is one of them is zero?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the only invalid case is where start is not zero but end is zero.
src/dbnode/encoding/types.go
Outdated
NumEncoded() int | ||
|
||
// LastEncoded returns the last encoded datapoint, useful for | ||
// de-duplicating encoded values. If there no values encoded previously |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no previously encoded values...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, ta.
@@ -437,7 +437,7 @@ func TestReadCommitLogMissingMetadata(t *testing.T) { | |||
|
|||
func TestCommitLogReaderIsNotReusable(t *testing.T) { | |||
// Make sure we're not leaking goroutines | |||
defer leaktest.CheckTimeout(t, time.Second)() | |||
defer leaktest.CheckTimeout(t, 10*time.Second)() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if bumping this is necessary, you might just need to rebase master. Prateek upgraded to the latest version of leaktest which fixes this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, ok sure thing.
@@ -859,6 +859,8 @@ func (s *commitLogSource) startM3TSZEncodingWorker( | |||
wroteExisting = false | |||
) | |||
for i := range unmergedBlock { | |||
// TODO(r): Write unit test to ensure that different values that arrive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point, did you handle this anywhere? I see a TODO for a unit test, but I don't see the logic at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no test here, I think I'll open an issue to follow up on this. This seems out of scope of this change, it would need to be a followup I believe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a link on the comment in the code here to the issue I created:
#898
b.encoders = append(b.encoders, next) | ||
idx = len(b.encoders) - 1 | ||
|
||
// Upsert/last-write-wins semantics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was skimming through the encoder code and its not clear to me why we need to create a new encoder for this scenario. Does it simplify the iteration logic? it seemed like the encoder could encode multiple different values for the same timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could, it's just way harder and I don't really want to enforce encoders have to deal with this case right now. We can revisit this later.
src/dbnode/storage/series/buffer.go
Outdated
|
||
b.encoders[idx].lastWriteAt = datapoint.Timestamp | ||
|
||
if b.empty { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to add this if statement? I assume its just as fast if not faster to just unconditionally update it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's kind of buggy and nasty to keep this tracking everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments but is starting to look good. Only thing I'm low confidence on is the iteration logic because I'm not very familiar with it
2883443
to
2cf04be
Compare
@@ -545,8 +543,22 @@ func (b *dbBufferBucket) finalize() { | |||
b.resetBootstrapped() | |||
} | |||
|
|||
func (b *dbBufferBucket) empty() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you think this will be ok perf-wise? I assume thats why we had the bool to begin with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this should be fine, we don't actually call it from any high frequency code paths. We used to have a high frequency call site with it, although now it was refactored out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if you're confident about the iterator changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
tags ident.TagIterator, | ||
startInclusive, endExclusive time.Time, | ||
replicas []MultiReaderIterator, | ||
opts SeriesIteratorOptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for this
@@ -45,6 +45,10 @@ func (e *nullEncoder) Encode(dp ts.Datapoint, timeUnit xtime.Unit, annotation ts | |||
func (e *nullEncoder) Stream() xio.SegmentReader { | |||
return xio.NewSegmentReader(ts.Segment{}) | |||
} | |||
func (e *nullEncoder) NumEncoded() int { return 0 } | |||
func (e *nullEncoder) LastEncoded() (ts.Datapoint, error) { | |||
return ts.Datapoint{}, fmt.Errorf("not implemented") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use a const errors.New()
instead of the fmt.Errorf
here
@@ -752,6 +802,8 @@ func (b *dbBufferBucket) merge() (mergeResult, error) { | |||
} | |||
}() | |||
|
|||
// Rank bootstrapped blocks as data that has appeared before data that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm can this be violated during a topology change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mainly a best effort change to be honest, as we know without keeping a timestamp next to each value of when it was written we can't guarantee selecting the last upserted value when reading values returned from multiple replicas.
It should be a good approximation however, which is what we need right now. When we are ready to begin solving it by storing metadata next to the values we can offer configuration for either strict or best effort upserts.
return err | ||
} | ||
if last.Value == value { | ||
// No-op since matches the current value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
} | ||
|
||
func (i *iterators) len() int { | ||
return len(i.values) | ||
} | ||
|
||
func (i *iterators) current() (ts.Datapoint, xtime.Unit, ts.Annotation) { | ||
return i.earliest.Current() | ||
numIters := len(i.earliest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this function be implemented using a heap instead? wondering if we'd avoid sorting for each call to current()
that way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can look at this in a followup change, currently none of the non-default strategies aren't likely to be called very often (only subsets of queries/requests, etc, hence using the setter at a per series iterator granularity).
When we have a more complete implementation that can actually return you the last written value, say with metadata stored alongside the value, we should then look at optimizing this. This is mainly an ergonomic change useful for edge cases rather than supposed to be definitive and optimized.
return currA.Value > currB.Value | ||
}) | ||
|
||
case IterateHighestFrequencyValue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol this feels so magic-y. Do you have a use in mind for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per docs its basically if you are using consistency: all
and don't mind the read unavailability when not all replicas are available to be read from (say perhaps exact match mode for testing correctness for shadow traffic, etc):
// IterateHighestFrequencyValue is useful across replicas when you want to
// choose the most common appearing value, however you can only use this
// reliably if you wait to successfully fetch values from all replicas, i.e.
// you cannot use this reliably with quorum/majority replicas consistency,
// and only all consistency.
IterateHighestFrequencyValue
filterStart time.Time | ||
filterEnd time.Time | ||
values []Iterator | ||
earliest []Iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I convince you to write a little blurb about the intent behind how earliest
and values
are used in the code? I think I follow based on the implementation of moveToValidNext()
but it's a little convoluted the first time someone looks at it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, can do.
No description provided.