Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Aggregator and AggregatorFactory interfaces to improve mem estimates #12073

Merged
merged 17 commits into from
Feb 3, 2022

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Dec 15, 2021

Fixes #12022

Description

The current implementation of memory estimation in OnHeapIncrementalIndex:

  • uses guessAggregatorHeapFootprint() to calculate the max memory an metric aggregator can use
  • multiplies the above value by the number of aggregators (same as number of aggregated rows or number of unique row keys)

Current implementation of StringDimensionIndexer

  • encodes dimension values and calculates size of encoded values as well as original dimension values every time, irrespective of whether the dimension values are already added to the dictionary

Because of the above, the memory usage tends to be over-estimated which leads to more persistence cycles than necessary.

This PR replaces the max estimation mechanism with getting the actual incremental memory used by the aggregator or indexer at each invocation of aggregate or encode respectively.


Changes

  • Add new flag useMaxMemoryEstimates in the task context. This overrides the same flag in DefaultTaskConfig i.e. druid.indexer.task.default.context map
  • useMaxMemoryEstimates = true(default value) denotes the current method of estimation
  • Add method AggregatorFactory.factorizeWithSize() that returns an AggregatorAndSize which contains
    • the Aggregator instance
    • initial size in bytes of the aggregator
  • Add method Aggregator.aggregateWithSize()
    • returns a long representing the incremental memory used by this aggregation
    • default implementation calls aggregate() and returns 0
  • Remove method DimensionIndexer.estimateEncodedKeyComponentSize()
  • Update the method DimensionIndexer.processRowValsToKeyComponent() to return EncodedKeyComponent<EncodedType> which contains:
    • EncodedType keyComponent: e.g. int[] for StringDimensionIndexer, Long for LongDimensionIndexer
    • long effectiveSizeBytes: Effective size of the key component.
  • Update OnHeapIncrementalIndex to use the new estimations only if useMaxMemoryEstimates = false
  • Update Aggregator impls

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz changed the title [WIP] Update Aggregator and AggregatorFactory interfaces to fix mem estimates [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates Dec 15, 2021
@kfaraz kfaraz marked this pull request as ready for review January 3, 2022 04:00
@lgtm-com
Copy link

lgtm-com bot commented Jan 3, 2022

This pull request introduces 2 alerts when merging 780e50da8202e31f69d9c6ceacdd42ed4f14e57c into fe71fc4 - view on LGTM.com

new alerts:

  • 1 for Result of multiplication cast to wider type
  • 1 for Useless null check

sketchField.setAccessible(true);
}
catch (NoSuchFieldException | ClassNotFoundException e) {
LOG.error(e, "Could not initialize 'sketchField'");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only happen if someone happens to have loaded a new/different version of sketches than is actually depended on by this current code. If that happens, this error will put something into the logs that will be ignored (people don't look at logs until something actually explodes) and then silently ignore things. When they are silently ignored, the estimation becomes incorrect and potentially starts causing OOMs where OOMs didn't exist previously. If this happens, it will be super hard to track down why it happened.

I would recommend that we actually explode loudly throwing an error out of the static initializer (which should effectively kill the process from actually starting in the first place). If we want a way for someone to say "I know what I'm doing, ignore this please", we can add an extra config that the error message in the exception points to as a way to ignore things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. For now, we can just fail loudly. The additional config can be done as a follow up.

@kfaraz kfaraz force-pushed the fix_heap_mem_estimate branch from 2eaeab1 to 348f17e Compare January 11, 2022 17:03
@lgtm-com
Copy link

lgtm-com bot commented Jan 11, 2022

This pull request introduces 3 alerts when merging 348f17e into eb0bae4 - view on LGTM.com

new alerts:

  • 2 for Unused format argument
  • 1 for Result of multiplication cast to wider type

@kfaraz kfaraz changed the title [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates Update Aggregator and AggregatorFactory interfaces to improve mem estimates Jan 11, 2022
@@ -478,6 +479,11 @@ public IncrementalIndexAddResult add(InputRow row) throws IndexSizeExceededExcep
return add(row, false);
}

public IncrementalIndexAddResult add(InputRow row, boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException
{
return add(row, false, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be ignoring skipMaxRowsInMemoryCheck is that intentional? Probably worth a comment as to why that's intentional if it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for catching this! I must have missed it.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall makes sense 👍

@kfaraz have you done any measurement of the performance impact before/after this change so we know what we are getting ourselves into? Not sure which benchmarks would be most appropriate off the top of my head

@@ -160,4 +173,16 @@ public int getIdForNull()
lock.readLock().unlock();
}
}

private long getObjectSize(@Nonnull T object)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this method is presumptuous and breaks the contract of this class being generic. I think a size estimator function should be passed into this method, it needs to be public so that callers can override it, or maybe it should be abstract and some StringDimensionDictionary should be implemented to minimize function calls since its going to be a pretty hot method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out! I will see how we can make this cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used the StringDimensionDictionary suggestion, although I have not made it abstract so that implementations using the DimensionDictionary can continue to use it as the base concrete class.

public class AggregatorAndSize
{

// TODO: include default overhead for object sizes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unresolved todo

Copy link
Contributor Author

@kfaraz kfaraz Jan 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Addressed in the caller.

@@ -127,7 +127,7 @@
* @return An array containing an encoded representation of the input row value.
*/
@Nullable
EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions);
EncodedDimensionValue<EncodedKeyComponentType> processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: javadoc params and return type needs updated.. but this isn't really new, it basically needed updated long ago, it is not always an array 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -271,6 +271,7 @@ private Sink getSink(long timestamp)
config.getAppendableIndexSpec(),
config.getMaxRowsInMemory(),
config.getMaxBytesInMemoryOrDefault(),
true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've noticed a few places that aren't wired up to config are using true, so using the new behavior with no way out, is that intentional? this one in particular probably doesn't matter all that much these days (i hope), but i'm less sure about all of them, and it isn't consistent because i see some hard coded false in there too

Copy link
Contributor Author

@kfaraz kfaraz Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of the flag useMaxMemoryEstimates = true represents old behaviour.

The hard coding has been done only for the following classes:

  • RealtimePlumber and related classes (hopefully not used anymore)
  • OnHeapIncrementalIndexBenchmark (accidentally hardcoded to false, fixing this)

It should not be hard-coded anywhere else except maybe tests.

@@ -557,12 +573,17 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row)
DimensionIndexer indexer = desc.getIndexer();
Object dimsKey = null;
try {
dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true);
final EncodedDimensionValue<?> encodedDimensionValue
= indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess due to the way this refactor was done (compared to aggs) there is no real way to turn off calculating the estimates, even if we aren't using them. maybe it doesn't matter if there is no/minimal performance impact

Copy link
Contributor Author

@kfaraz kfaraz Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was not too sure about this either. I will take another look and see if we can separate the two flows without too much duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated DimensionDictionary and StringDimensionIndexer to turn off estimations if not needed.

@kfaraz
Copy link
Contributor Author

kfaraz commented Jan 12, 2022

have you done any measurement of the performance impact before/after this change so we know what we are getting ourselves into?

Thanks for the review, @clintropolis !
I don't have enough perf numbers yet which is why I have put the changes behind a flag for the time being.
I am working on the perf evaluation. Once we are satisfied with the numbers, we can get rid of the flag altogether.

Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. You should also document the approach that was finalized in the proposal and this implementation is based on. I think AggregatorFactory#factorizeWithSize is probably a good place for writing that doc and overall approach.

@@ -42,6 +42,12 @@
{
void aggregate();

default long aggregateWithSize()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method needs javadocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder, @abhishekagarwal87. I am adding javadocs wherever missing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the contract of this value, I don't see the word estimate in here, but think it probably should be... should implementors over-estimate if exact sizing is not possible or is under-estimating fine? Should there be a warning that the default estimate is used? (i imagine this would be very noisy if it is done per aggregate call... so don't really recommend doing it here or anything...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a warning that the default estimate is used?

It would make sense to give this warning when factorizeWithSize is overridden but aggregateWithSize is not. In such a case, we might be significantly underestimating the memory usage.

As you said, doing it here might be noisy. A viable approach could be to have factorizeWithSize return a wrapper Aggregator which does not allow the regular aggregate (will be addressed in the subsequent PR).

// TODO: include default overhead for object sizes

private final Aggregator aggregator;
private final long initialSizeBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add more info like is this total on-heap footprint that includes JVM object overhead or that overhead is not considered in this initial size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. It should account for JVM object overhead too.

{
@Nullable
private final K value;
private final long incrementalSizeBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment on what this thing is the incremental size of

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@@ -160,4 +173,16 @@ public int getIdForNull()
lock.readLock().unlock();
}
}

private long getObjectSize(@Nonnull T object)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

@@ -80,6 +81,14 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory)
return new SketchAggregator(selector, size);
}

@Override
public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs some documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@kfaraz
Copy link
Contributor Author

kfaraz commented Jan 17, 2022

You should also document the approach that was finalized in the proposal and this implementation is based on.

@abhishekagarwal87 , I have added an overview of the approach in OnHeapIncrementalIndex.
I have also added javadocs for the methods in Aggregator and AggregatorFactory, but they contain information only relevant to those methods.

Please let me know if this is sufficient.

@lgtm-com
Copy link

lgtm-com bot commented Jan 17, 2022

This pull request introduces 1 alert when merging 9908c0b into b55f7a2 - view on LGTM.com

new alerts:

  • 1 for Result of multiplication cast to wider type

@@ -42,6 +42,12 @@
{
void aggregate();

default long aggregateWithSize()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the contract of this value, I don't see the word estimate in here, but think it probably should be... should implementors over-estimate if exact sizing is not possible or is under-estimating fine? Should there be a warning that the default estimate is used? (i imagine this would be very noisy if it is done per aggregate call... so don't really recommend doing it here or anything...)

*
* @return AggregatorAndSize which contains the actual aggregator and its initial size.
*/
public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question about contract about returned sizes. Also, I wonder if there is anything we could do to make sure this method is overridden if aggregateWithSize is implemented, so that the initial size is not the max size...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the javadoc to advise on the required estimation.

Also, I wonder if there is anything we could do to make sure this method is overridden if aggregateWithSize is implemented

I guess it is okay even if it isn't overridden because we would only be overestimating which would not cause failures, only somewhat poorer estimates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other way around is probably more of an issue i.e. overriding factorizeWithSize but not aggregateWithSize. In such a case, factorizeWithSize would give a small initial size, which would never increase because aggregateWithSize would always return 0.

In either case, this issue is a problem if we use the new behaviour, i.e. useMaxMemoryEstimates = false. I guess we could address this is in a subsequent PR.

{
final int[] encodedDimensionValues;
final int oldDictSize = dimLookup.size();
final long oldDictSizeInBytes = useMaxMemoryEstimates ? 0 : dimLookup.sizeInBytes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is only used inside of the else near the end of the method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the size of the dictionary before adding the dimension values.
At the end, we take the final size of dictionary and check the diff.

{
super(useMaxMemoryEstimates ? new DimensionDictionary<>() : new StringDimensionDictionary());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this seems strange/confusing, why wouldn't StringDimensionIndexer always use StringDimensionDictionary here? it seems like could just pass in a value of false to control the value of computeOnHeapSize instead of sometimes not using StringDimensionDictionary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it is weird.
Fixed it.

Comment on lines 50 to 58
static {
try {
SKETCH_FIELD = Class.forName("org.apache.datasketches.theta.UnionImpl")
.getDeclaredField("gadget_");
SKETCH_FIELD.setAccessible(true);
}
catch (NoSuchFieldException | ClassNotFoundException e) {
throw new ISE(e, "Could not initialize SketchAggregator");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems worth a comment, and maybe a link to the code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exception in static initialization blocks can surface as very weird errors.
http://javaeesupportpatterns.blogspot.com/2012/07/javalangnoclassdeffounderror-how-to.html.

Maybe we can just move this initialization to the constructor? We need not worry too much about thread safety since it's ok even if SKETCH_FIELD gets constructed twice.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@abhishekagarwal87
Copy link
Contributor

@kfaraz CI failures might be legit. can you fix those before merging?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Proposal] Improve memory estimations in OnHeapIncrementalIndex
4 participants