-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update Aggregator and AggregatorFactory interfaces to improve mem estimates #12073
Conversation
This pull request introduces 2 alerts when merging 780e50da8202e31f69d9c6ceacdd42ed4f14e57c into fe71fc4 - view on LGTM.com new alerts:
|
...es/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
Outdated
Show resolved
Hide resolved
...es/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
Outdated
Show resolved
Hide resolved
sketchField.setAccessible(true); | ||
} | ||
catch (NoSuchFieldException | ClassNotFoundException e) { | ||
LOG.error(e, "Could not initialize 'sketchField'"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will only happen if someone happens to have loaded a new/different version of sketches than is actually depended on by this current code. If that happens, this error will put something into the logs that will be ignored (people don't look at logs until something actually explodes) and then silently ignore things. When they are silently ignored, the estimation becomes incorrect and potentially starts causing OOMs where OOMs didn't exist previously. If this happens, it will be super hard to track down why it happened.
I would recommend that we actually explode loudly throwing an error out of the static initializer (which should effectively kill the process from actually starting in the first place). If we want a way for someone to say "I know what I'm doing, ignore this please", we can add an extra config that the error message in the exception points to as a way to ignore things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. For now, we can just fail loudly. The additional config can be done as a follow up.
...es/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
Show resolved
Hide resolved
...es/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
Outdated
Show resolved
Hide resolved
2eaeab1
to
348f17e
Compare
This pull request introduces 3 alerts when merging 348f17e into eb0bae4 - view on LGTM.com new alerts:
|
@@ -478,6 +479,11 @@ public IncrementalIndexAddResult add(InputRow row) throws IndexSizeExceededExcep | |||
return add(row, false); | |||
} | |||
|
|||
public IncrementalIndexAddResult add(InputRow row, boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException | |||
{ | |||
return add(row, false, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to be ignoring skipMaxRowsInMemoryCheck
is that intentional? Probably worth a comment as to why that's intentional if it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for catching this! I must have missed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall makes sense 👍
@kfaraz have you done any measurement of the performance impact before/after this change so we know what we are getting ourselves into? Not sure which benchmarks would be most appropriate off the top of my head
@@ -160,4 +173,16 @@ public int getIdForNull() | |||
lock.readLock().unlock(); | |||
} | |||
} | |||
|
|||
private long getObjectSize(@Nonnull T object) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, this method is presumptuous and breaks the contract of this class being generic. I think a size estimator function should be passed into this method, it needs to be public so that callers can override it, or maybe it should be abstract and some StringDimensionDictionary
should be implemented to minimize function calls since its going to be a pretty hot method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out! I will see how we can make this cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used the StringDimensionDictionary
suggestion, although I have not made it abstract so that implementations using the DimensionDictionary
can continue to use it as the base concrete class.
public class AggregatorAndSize | ||
{ | ||
|
||
// TODO: include default overhead for object sizes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unresolved todo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Addressed in the caller.
@@ -127,7 +127,7 @@ | |||
* @return An array containing an encoded representation of the input row value. | |||
*/ | |||
@Nullable | |||
EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions); | |||
EncodedDimensionValue<EncodedKeyComponentType> processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: javadoc params and return type needs updated.. but this isn't really new, it basically needed updated long ago, it is not always an array 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -271,6 +271,7 @@ private Sink getSink(long timestamp) | |||
config.getAppendableIndexSpec(), | |||
config.getMaxRowsInMemory(), | |||
config.getMaxBytesInMemoryOrDefault(), | |||
true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i've noticed a few places that aren't wired up to config are using true, so using the new behavior with no way out, is that intentional? this one in particular probably doesn't matter all that much these days (i hope), but i'm less sure about all of them, and it isn't consistent because i see some hard coded false in there too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value of the flag useMaxMemoryEstimates = true
represents old behaviour.
The hard coding has been done only for the following classes:
RealtimePlumber
and related classes (hopefully not used anymore)OnHeapIncrementalIndexBenchmark
(accidentally hardcoded to false, fixing this)
It should not be hard-coded anywhere else except maybe tests.
@@ -557,12 +573,17 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) | |||
DimensionIndexer indexer = desc.getIndexer(); | |||
Object dimsKey = null; | |||
try { | |||
dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true); | |||
final EncodedDimensionValue<?> encodedDimensionValue | |||
= indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess due to the way this refactor was done (compared to aggs) there is no real way to turn off calculating the estimates, even if we aren't using them. maybe it doesn't matter if there is no/minimal performance impact
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was not too sure about this either. I will take another look and see if we can separate the two flows without too much duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated DimensionDictionary
and StringDimensionIndexer
to turn off estimations if not needed.
Thanks for the review, @clintropolis ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. You should also document the approach that was finalized in the proposal and this implementation is based on. I think AggregatorFactory#factorizeWithSize is probably a good place for writing that doc and overall approach.
@@ -42,6 +42,12 @@ | |||
{ | |||
void aggregate(); | |||
|
|||
default long aggregateWithSize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method needs javadocs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder, @abhishekagarwal87. I am adding javadocs wherever missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the contract of this value, I don't see the word estimate in here, but think it probably should be... should implementors over-estimate if exact sizing is not possible or is under-estimating fine? Should there be a warning that the default estimate is used? (i imagine this would be very noisy if it is done per aggregate call... so don't really recommend doing it here or anything...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a warning that the default estimate is used?
It would make sense to give this warning when factorizeWithSize
is overridden but aggregateWithSize
is not. In such a case, we might be significantly underestimating the memory usage.
As you said, doing it here might be noisy. A viable approach could be to have factorizeWithSize
return a wrapper Aggregator which does not allow the regular aggregate
(will be addressed in the subsequent PR).
// TODO: include default overhead for object sizes | ||
|
||
private final Aggregator aggregator; | ||
private final long initialSizeBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add more info like is this total on-heap footprint that includes JVM object overhead or that overhead is not considered in this initial size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. It should account for JVM object overhead too.
{ | ||
@Nullable | ||
private final K value; | ||
private final long incrementalSizeBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment on what this thing is the incremental size of
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
@@ -160,4 +173,16 @@ public int getIdForNull() | |||
lock.readLock().unlock(); | |||
} | |||
} | |||
|
|||
private long getObjectSize(@Nonnull T object) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
@@ -80,6 +81,14 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) | |||
return new SketchAggregator(selector, size); | |||
} | |||
|
|||
@Override | |||
public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs some documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
@abhishekagarwal87 , I have added an overview of the approach in Please let me know if this is sufficient. |
This pull request introduces 1 alert when merging 9908c0b into b55f7a2 - view on LGTM.com new alerts:
|
@@ -42,6 +42,12 @@ | |||
{ | |||
void aggregate(); | |||
|
|||
default long aggregateWithSize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the contract of this value, I don't see the word estimate in here, but think it probably should be... should implementors over-estimate if exact sizing is not possible or is under-estimating fine? Should there be a warning that the default estimate is used? (i imagine this would be very noisy if it is done per aggregate call... so don't really recommend doing it here or anything...)
* | ||
* @return AggregatorAndSize which contains the actual aggregator and its initial size. | ||
*/ | ||
public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question about contract about returned sizes. Also, I wonder if there is anything we could do to make sure this method is overridden if aggregateWithSize
is implemented, so that the initial size is not the max size...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the javadoc to advise on the required estimation.
Also, I wonder if there is anything we could do to make sure this method is overridden if aggregateWithSize is implemented
I guess it is okay even if it isn't overridden because we would only be overestimating which would not cause failures, only somewhat poorer estimates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other way around is probably more of an issue i.e. overriding factorizeWithSize
but not aggregateWithSize
. In such a case, factorizeWithSize
would give a small initial size, which would never increase because aggregateWithSize
would always return 0.
In either case, this issue is a problem if we use the new behaviour, i.e. useMaxMemoryEstimates = false
. I guess we could address this is in a subsequent PR.
{ | ||
final int[] encodedDimensionValues; | ||
final int oldDictSize = dimLookup.size(); | ||
final long oldDictSizeInBytes = useMaxMemoryEstimates ? 0 : dimLookup.sizeInBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is only used inside of the else near the end of the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the size of the dictionary before adding the dimension values.
At the end, we take the final size of dictionary and check the diff.
{ | ||
super(useMaxMemoryEstimates ? new DimensionDictionary<>() : new StringDimensionDictionary()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this seems strange/confusing, why wouldn't StringDimensionIndexer
always use StringDimensionDictionary
here? it seems like could just pass in a value of false
to control the value of computeOnHeapSize
instead of sometimes not using StringDimensionDictionary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, it is weird.
Fixed it.
static { | ||
try { | ||
SKETCH_FIELD = Class.forName("org.apache.datasketches.theta.UnionImpl") | ||
.getDeclaredField("gadget_"); | ||
SKETCH_FIELD.setAccessible(true); | ||
} | ||
catch (NoSuchFieldException | ClassNotFoundException e) { | ||
throw new ISE(e, "Could not initialize SketchAggregator"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems worth a comment, and maybe a link to the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exception in static initialization blocks can surface as very weird errors.
http://javaeesupportpatterns.blogspot.com/2012/07/javalangnoclassdeffounderror-how-to.html.
Maybe we can just move this initialization to the constructor? We need not worry too much about thread safety since it's ok even if SKETCH_FIELD gets constructed twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@kfaraz CI failures might be legit. can you fix those before merging? |
Fixes #12022
Description
The current implementation of memory estimation in
OnHeapIncrementalIndex
:guessAggregatorHeapFootprint()
to calculate the max memory an metric aggregator can useCurrent implementation of
StringDimensionIndexer
Because of the above, the memory usage tends to be over-estimated which leads to more persistence cycles than necessary.
This PR replaces the max estimation mechanism with getting the actual incremental memory used by the aggregator or indexer at each invocation of
aggregate
orencode
respectively.Changes
useMaxMemoryEstimates
in the task context. This overrides the same flag in DefaultTaskConfig i.e.druid.indexer.task.default.context
mapuseMaxMemoryEstimates = true
(default value) denotes the current method of estimationAggregatorFactory.factorizeWithSize()
that returns anAggregatorAndSize
which containsAggregator
instanceAggregator.aggregateWithSize()
aggregate()
and returns 0DimensionIndexer.estimateEncodedKeyComponentSize()
DimensionIndexer.processRowValsToKeyComponent()
to returnEncodedKeyComponent<EncodedType>
which contains:EncodedType keyComponent
: e.g. int[] for StringDimensionIndexer, Long for LongDimensionIndexerlong effectiveSizeBytes
: Effective size of the key component.OnHeapIncrementalIndex
to use the new estimations only ifuseMaxMemoryEstimates = false
Aggregator
implsThis PR has: