Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use upsert behavior for datapoints written to the mutable series buffer #876

Merged
merged 15 commits into from
Sep 12, 2018

Conversation

robskillington
Copy link
Collaborator

@robskillington robskillington commented Sep 2, 2018

No description provided.

@codecov
Copy link

codecov bot commented Sep 2, 2018

Codecov Report

Merging #876 into master will increase coverage by 0.04%.
The diff coverage is 91.9%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #876      +/-   ##
==========================================
+ Coverage   78.67%   78.71%   +0.04%     
==========================================
  Files         396      397       +1     
  Lines       33533    33678     +145     
==========================================
+ Hits        26382    26511     +129     
- Misses       5354     5359       +5     
- Partials     1797     1808      +11
Flag Coverage Δ
#dbnode 81.45% <91.03%> (+0.01%) ⬆️
#m3ninx 71.93% <ø> (ø) ⬆️
#query 69.64% <100%> (+0.14%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2883443...11dfeb6. Read the comment docs.

@richardartoul
Copy link
Contributor

richardartoul commented Sep 4, 2018

Maybe its too much of a pain, but how do you feel about making the conflict resolution strategy configurable? The main reason being:

  1. The existing first-write-wins strategy is useful in certain situations (I.E for keeping track of whether or not something has occurred in a given period of time by truncating to a given number of hours and then always writing 1.0 ala Utilization Monitor. You could still do this with this change, but it would be a lot more resource intensive.
  2. A deterministic strategy (E.X the biggest value always wins) would be really useful for when we're trying to do shadow comparisons of one m3db cluster against another since even though they receive the same writes, they might receive them in slightly different orders

@robskillington
Copy link
Collaborator Author

@richardartoul all feedback addressed, if you write the same value to same timestamp multiple times it will be a no-op for creating buffers, and you can control how it selects different datapoints when different values for the same timestamp are being iterated over.


switch i.equalTimesStrategy {
case IterateHighestValue:
sort.Slice(i.earliest, func(a, b int) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this gonna alloc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thankfully no, output of go build -gcflags "-m" . in this package:

./iterators.go:63:26: (*iterators).current func literal does not escape
./iterators.go:70:26: (*iterators).current func literal does not escape
./iterators.go:87:26: (*iterators).current func literal does not escape

@@ -0,0 +1,156 @@
// Copyright (c) 2017 Uber Technologies, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2018

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ta, will fix.


package encoding

import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this file really not have any tests before?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, it was covered by multi reader iterator tests before, since this file was split as a subtype of multi reader iterator when series iterator needed the same type of logic.


const (
// IterateLastPushed is useful for within a single replica, using the last
// immutable buffer that was created to decide which value to choose.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iterators have to be given in the correct order though right? Maybe clarify?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing.


// DefaultIterateEqualTimestampStrategy is the default iterate
// equal timestamp strategy.
DefaultIterateEqualTimestampStrategy = IterateEqualTimestampStrategy(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do DefaultIterateEqualTimestampStrategy = IterateLastPushed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing.

// Return a copy here so callers cannot mutate the known list.
result := make([]IterateEqualTimestampStrategy, 0,
len(validIterateEqualTimestampStrategies))
copy(result, validIterateEqualTimestampStrategies)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this broken? my understanding is that copy will copy min(dst, src) and len(dst) here is zero

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, good call. I'll add a test for this too.

yaml "gopkg.in/yaml.v2"
)

func TestValidIterateEqualTimestampStrategies(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my previous comment is right, you might want to add a length test here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a values check, in addition to the ptr mismatch test.

@@ -101,10 +97,17 @@ func (it *seriesIterator) Close() {
return
}
it.closed = true
it.id.Finalize()
it.nsID.Finalize()
if it.id != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What changed that we suddenly needed these nil checks?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No new change, I'm just making sure we do the same nil check for all fields that are nil-able here.

it.iters.reset()
it.iters.setFilter(startInclusive, endExclusive)
if !it.start.IsZero() && !it.end.IsZero() {
it.iters.setFilter(it.start, it.end)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this do the right thing is one of them is zero?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the only invalid case is where start is not zero but end is zero.

NumEncoded() int

// LastEncoded returns the last encoded datapoint, useful for
// de-duplicating encoded values. If there no values encoded previously
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are no previously encoded values...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, ta.

@@ -437,7 +437,7 @@ func TestReadCommitLogMissingMetadata(t *testing.T) {

func TestCommitLogReaderIsNotReusable(t *testing.T) {
// Make sure we're not leaking goroutines
defer leaktest.CheckTimeout(t, time.Second)()
defer leaktest.CheckTimeout(t, 10*time.Second)()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if bumping this is necessary, you might just need to rebase master. Prateek upgraded to the latest version of leaktest which fixes this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, ok sure thing.

@@ -859,6 +859,8 @@ func (s *commitLogSource) startM3TSZEncodingWorker(
wroteExisting = false
)
for i := range unmergedBlock {
// TODO(r): Write unit test to ensure that different values that arrive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, did you handle this anywhere? I see a TODO for a unit test, but I don't see the logic at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no test here, I think I'll open an issue to follow up on this. This seems out of scope of this change, it would need to be a followup I believe.

Copy link
Collaborator Author

@robskillington robskillington Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a link on the comment in the code here to the issue I created:
#898

b.encoders = append(b.encoders, next)
idx = len(b.encoders) - 1

// Upsert/last-write-wins semantics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was skimming through the encoder code and its not clear to me why we need to create a new encoder for this scenario. Does it simplify the iteration logic? it seemed like the encoder could encode multiple different values for the same timestamp

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could, it's just way harder and I don't really want to enforce encoders have to deal with this case right now. We can revisit this later.


b.encoders[idx].lastWriteAt = datapoint.Timestamp

if b.empty {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to add this if statement? I assume its just as fast if not faster to just unconditionally update it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's kind of buggy and nasty to keep this tracking everywhere.

Copy link
Contributor

@richardartoul richardartoul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments but is starting to look good. Only thing I'm low confidence on is the iteration logic because I'm not very familiar with it

@@ -545,8 +543,22 @@ func (b *dbBufferBucket) finalize() {
b.resetBootstrapped()
}

func (b *dbBufferBucket) empty() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you think this will be ok perf-wise? I assume thats why we had the bool to begin with

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this should be fine, we don't actually call it from any high frequency code paths. We used to have a high frequency call site with it, although now it was refactored out.

richardartoul
richardartoul previously approved these changes Sep 11, 2018
Copy link
Contributor

@richardartoul richardartoul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if you're confident about the iterator changes.

Copy link
Contributor

@richardartoul richardartoul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

tags ident.TagIterator,
startInclusive, endExclusive time.Time,
replicas []MultiReaderIterator,
opts SeriesIteratorOptions,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this

@@ -45,6 +45,10 @@ func (e *nullEncoder) Encode(dp ts.Datapoint, timeUnit xtime.Unit, annotation ts
func (e *nullEncoder) Stream() xio.SegmentReader {
return xio.NewSegmentReader(ts.Segment{})
}
func (e *nullEncoder) NumEncoded() int { return 0 }
func (e *nullEncoder) LastEncoded() (ts.Datapoint, error) {
return ts.Datapoint{}, fmt.Errorf("not implemented")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use a const errors.New() instead of the fmt.Errorf here

@@ -752,6 +802,8 @@ func (b *dbBufferBucket) merge() (mergeResult, error) {
}
}()

// Rank bootstrapped blocks as data that has appeared before data that
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm can this be violated during a topology change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mainly a best effort change to be honest, as we know without keeping a timestamp next to each value of when it was written we can't guarantee selecting the last upserted value when reading values returned from multiple replicas.

It should be a good approximation however, which is what we need right now. When we are ready to begin solving it by storing metadata next to the values we can offer configuration for either strict or best effort upserts.

return err
}
if last.Value == value {
// No-op since matches the current value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

func (i *iterators) len() int {
return len(i.values)
}

func (i *iterators) current() (ts.Datapoint, xtime.Unit, ts.Annotation) {
return i.earliest.Current()
numIters := len(i.earliest)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this function be implemented using a heap instead? wondering if we'd avoid sorting for each call to current() that way

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can look at this in a followup change, currently none of the non-default strategies aren't likely to be called very often (only subsets of queries/requests, etc, hence using the setter at a per series iterator granularity).

When we have a more complete implementation that can actually return you the last written value, say with metadata stored alongside the value, we should then look at optimizing this. This is mainly an ergonomic change useful for edge cases rather than supposed to be definitive and optimized.

return currA.Value > currB.Value
})

case IterateHighestFrequencyValue:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol this feels so magic-y. Do you have a use in mind for it?

Copy link
Collaborator Author

@robskillington robskillington Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per docs its basically if you are using consistency: all and don't mind the read unavailability when not all replicas are available to be read from (say perhaps exact match mode for testing correctness for shadow traffic, etc):

// IterateHighestFrequencyValue is useful across replicas when you want to
// choose the most common appearing value, however you can only use this
// reliably if you wait to successfully fetch values from all replicas, i.e.
// you cannot use this reliably with quorum/majority replicas consistency,
// and only all consistency.
IterateHighestFrequencyValue

filterStart time.Time
filterEnd time.Time
values []Iterator
earliest []Iterator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I convince you to write a little blurb about the intent behind how earliest and values are used in the code? I think I follow based on the implementation of moveToValidNext() but it's a little convoluted the first time someone looks at it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, can do.

@robskillington robskillington merged commit 0061da1 into master Sep 12, 2018
@robskillington robskillington deleted the r/upsert-datapoints branch September 12, 2018 02:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants