-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Proposal] Improve memory estimations in OnHeapIncrementalIndex
#12022
Comments
This approach actually seems like a good idea to me for a few reasons:
|
Other than the point in the comment above about the aggregator interface, this design looks good to me. There's one potential issue I wanted to bring up: right now, there's other sources of overhead that the estimation doesn't capture. It's mostly fine because the current estimation is an overestimate, so it "covers" in a sense for other things that are not explicitly estimated. So when we make these estimates more accurate we may find we need to add estimation for more kinds of things. One example is the building of bitmaps during persist, which IIRC happens on heap. I've noticed in the past that this can have substantial footprint too. You should be able to repro a case where the footprint is big by having a multi value column that has 100+ values per row. I was working with a dataset like that when I first noticed this bitmap thing. For this particular one, maybe we can set a max dictionary size? |
Thanks a lot for the feedback, @gianm !
Thanks for calling this out, I will try to find other such sources of overhead that are not being estimated right now. |
While this proposal does introduce a pretty big backwards incompatibility and I hate introducing such compatibilities, doing this in the proposed backwards compatible way adds a branch after every single call to the aggregator which, given that this is a hot method, could have significant inadvertent impacts to query performance. Having the method be guaranteed to return a value that is always added minimizes that impact. That said, with just the proposed interface, the conditional still moves into the implementation of the Aggregator itself for simple aggregators like a sum aggregator where it's a fixed memory size from initialization as the first call to Thinking about this gave me another idea for how to minimize the incompatibility: We could add a method Alternatively, instead of adding a method to the Aggregator interface, we could give The least-risk of impact approach is probably to add the Perhaps we try the fully backwards compatible approach, benchmark it. Adjust to the incompatible, non-default method, benchmark that and see if there's a marked difference between them? |
To me this feels like a change that likely doesn't require incompatibility in order to perform well, for a couple specific reasons (below). So I do think we should do benchmarking before deciding to make things incompatible.
I was thinking that query-time callers would stick with
This one seems like the kind of thing that the JVM should be able to optimize. Also, for the reason above, it's a code path that I'd only expect to get called during ingestion anyway, where aggregation time is not as big a driver of overall performance as it is during queries. |
Based on the discussion above, we will modify the aggregator interfaces as follows: Interface Interface In the first iteration, we would put the new estimation logic behind a feature flag. After some more testing, the flag can be removed altogether. The following metrics will also be added (if they don't already exist):
|
Fixes #12022 ### Description The current implementations of memory estimation in `OnHeapIncrementalIndex` and `StringDimensionIndexer` tend to over-estimate which leads to more persistence cycles than necessary. This PR replaces the max estimation mechanism with getting the incremental memory used by the aggregator or indexer at each invocation of `aggregate` or `encode` respectively. ### Changes - Add new flag `useMaxMemoryEstimates` in the task context. This overrides the same flag in DefaultTaskConfig i.e. `druid.indexer.task.default.context` map - Add method `AggregatorFactory.factorizeWithSize()` that returns an `AggregatorAndSize` which contains the aggregator instance and the estimated initial size of the aggregator - Add method `Aggregator.aggregateWithSize()` which returns the incremental memory used by this aggregation step - Update the method `DimensionIndexer.processRowValsToKeyComponent()` to return the encoded key component as well as its effective size in bytes - Update `OnHeapIncrementalIndex` to use the new estimations only if `useMaxMemoryEstimates = false`
Changes: - Use apache datasketches version 3.2.0. - Remove unsafe reflection-based usage of datasketch internals added in #12022
Changes: - Use apache datasketches version 3.2.0. - Remove unsafe reflection-based usage of datasketch internals added in apache#12022
Motivation
The existing implementation in
OnHeapIncrementalIndex
tends to over-estimate memory usagethus leading to more persistence cycles than necessary during ingestion. A more accurate estimation
of mem usage would also free it up for other purposes.
The recent changes in #11950 have made improvements by adding a
guessAggregatorHeapFootprint()
method to theAggregator
interface. This proposal also addresses the same problem but advocates replacing the guess mechanism with getting the actual incremental memory used by an aggregator.Proposed changes
Aggregator.aggregate()
to return along
instead ofvoid
. The returned long would represent the incremental memory in bytes used by the aggregator in that particular invocation of the method.DimensionIndexer.estimateEncodedKeyComponentSize()
DimensionIndexer.getUnsortedEncodedValueFromSorted()
to return objects of a new generic classEncodedDimensionValue<EncodedType>
which contains:EncodedType value
: e.g.int[]
forStringDimIndexer
,Long
forLongDimIndexer
long incrementalSize
: The delta in size required for thevalue
. For numerical values, e.g. inLongDimensionIndexer
, it would just be the size of the datatype. But forStringDimensionIndexer
, which returns an encodedint []
, it would represent the size of the array and also any new dimension value that has been encoded into the dictionary in this invocation. Simply put, thegetUnsortedEncodedValueFromSorted()
now returns a payload and also the memory required for that payload.Rationale
Estimation of a row size
row size = aggregator size + dims key size + overhead
Aggregator Size
Currently, we compute the max size for the aggregator and use it for every row, which overestimates the actual memory usage. With the proposed change, we would be using the actual footprint for each row and not the max thus getting more accurate estimates.
Dims Key Size
The estimation of the dimension key size already takes into account the current row and not the max size. But here, the overestimates are caused by repeatedly adding the footprint of the same String values (especially in the case of multi-valued dimensions) which are in fact stored only once in the dictionary and only the integers codes are present in every row. With the proposed change, we add the footprint of a String value only once when it is being newly added to the dictionary.
Backwards Compatibility
Neither of the changes mentioned above would be backwards compatible.
Aggregator Size
Any class implementing the
Aggregator
interface would need to be fixed to return along
instead ofvoid
.Workaround (Rejected):
To retain compatibilty, we could add a new
default long aggregateAndEstimateMemory()
method and leave the existingaggregate()
method as is. The default implementation would return a negative value (say -1) in which case theOnHeapIncrementalIndex
or any other estimator would use the max estimated size for that invocation.But this approach would be pretty hacky and problematic in the long run as callers of the
Aggregator
would be free to call either of the two aggregate methods thus producing widely different and erroneous memory estimations.Dims Key Size
Any class implementing the
DimensionIndexer
would have to be fixed. (This change is less of a compatibility concern as compared to theAggregator
change as theDimensionIndexer
has fewer implementations)Workaround (Rejected):
To retain compatibility, we could retain the
DimensionIndexer.estimateEncodedKeyComponentSize()
which would account for a String value only when it is newly encountered. But this would require having two dictionaries, the first used by the methodgetUnsortedEncodedValueFromSorted()
to track encoded String values encoding and the second used byestimateEncodedKeyComponentSize()
to track estimated String values.This workaround would introduce unnecessary complexity and overhead of maintaining two dictionaries.
Operational impact
Future Work (optional)
The memory estimate values returned by the updated
aggregate()
method could be used by other callers (such asTimeseriesQueryEngine
) to estimate memory if required.The text was updated successfully, but these errors were encountered: