Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Improve memory estimations in OnHeapIncrementalIndex #12022

Closed
kfaraz opened this issue Dec 3, 2021 · 6 comments · Fixed by #12073
Closed

[Proposal] Improve memory estimations in OnHeapIncrementalIndex #12022

kfaraz opened this issue Dec 3, 2021 · 6 comments · Fixed by #12073

Comments

@kfaraz
Copy link
Contributor

kfaraz commented Dec 3, 2021

Motivation

The existing implementation in OnHeapIncrementalIndex tends to over-estimate memory usage
thus leading to more persistence cycles than necessary during ingestion. A more accurate estimation
of mem usage would also free it up for other purposes.

The recent changes in #11950 have made improvements by adding a guessAggregatorHeapFootprint() method to the Aggregator interface. This proposal also addresses the same problem but advocates replacing the guess mechanism with getting the actual incremental memory used by an aggregator.

Proposed changes

  • Update the method Aggregator.aggregate() to return a long instead of void. The returned long would represent the incremental memory in bytes used by the aggregator in that particular invocation of the method.
  • Remove the method DimensionIndexer.estimateEncodedKeyComponentSize()
  • Update the method DimensionIndexer.getUnsortedEncodedValueFromSorted() to return objects of a new generic class EncodedDimensionValue<EncodedType> which contains:
    • EncodedType value: e.g. int[] for StringDimIndexer, Long for LongDimIndexer
    • long incrementalSize: The delta in size required for the value. For numerical values, e.g. in LongDimensionIndexer, it would just be the size of the datatype. But for StringDimensionIndexer, which returns an encoded int [], it would represent the size of the array and also any new dimension value that has been encoded into the dictionary in this invocation. Simply put, the getUnsortedEncodedValueFromSorted() now returns a payload and also the memory required for that payload.

Rationale

Estimation of a row size
row size = aggregator size + dims key size + overhead

Aggregator Size
Currently, we compute the max size for the aggregator and use it for every row, which overestimates the actual memory usage. With the proposed change, we would be using the actual footprint for each row and not the max thus getting more accurate estimates.

Dims Key Size
The estimation of the dimension key size already takes into account the current row and not the max size. But here, the overestimates are caused by repeatedly adding the footprint of the same String values (especially in the case of multi-valued dimensions) which are in fact stored only once in the dictionary and only the integers codes are present in every row. With the proposed change, we add the footprint of a String value only once when it is being newly added to the dictionary.

Backwards Compatibility

Neither of the changes mentioned above would be backwards compatible.

Aggregator Size
Any class implementing the Aggregator interface would need to be fixed to return a long instead of void.

Workaround (Rejected):
To retain compatibilty, we could add a new default long aggregateAndEstimateMemory() method and leave the existing aggregate() method as is. The default implementation would return a negative value (say -1) in which case the OnHeapIncrementalIndex or any other estimator would use the max estimated size for that invocation.

But this approach would be pretty hacky and problematic in the long run as callers of the Aggregator would be free to call either of the two aggregate methods thus producing widely different and erroneous memory estimations.

Dims Key Size
Any class implementing the DimensionIndexer would have to be fixed. (This change is less of a compatibility concern as compared to the Aggregator change as the DimensionIndexer has fewer implementations)

Workaround (Rejected):
To retain compatibility, we could retain the DimensionIndexer.estimateEncodedKeyComponentSize() which would account for a String value only when it is newly encountered. But this would require having two dictionaries, the first used by the method getUnsortedEncodedValueFromSorted() to track encoded String values encoding and the second used by estimateEncodedKeyComponentSize() to track estimated String values.

This workaround would introduce unnecessary complexity and overhead of maintaining two dictionaries.

Operational impact

  • All aggregator implementations in extensions would have to be updated.
  • Rolling Upgrade: Not affected

Future Work (optional)

The memory estimate values returned by the updated aggregate() method could be used by other callers (such as TimeseriesQueryEngine) to estimate memory if required.

@gianm
Copy link
Contributor

gianm commented Dec 3, 2021

Workaround (Rejected):
To retain compatibilty, we could add a new default long aggregateAndEstimateMemory() method and leave the existing aggregate() method as is. The default implementation would return a negative value (say -1) in which case the OnHeapIncrementalIndex or any other estimator would use the max estimated size for that invocation.

This approach actually seems like a good idea to me for a few reasons:

  • Aggregation is hot code, and there will be some overhead to size estimation, and query-time callers don't necessarily need it. So having two methods allows the caller to decide if it actually needs size estimates or not.
  • We want to avoid incompatible changes to aggregation interfaces whenever possible.

@gianm
Copy link
Contributor

gianm commented Dec 3, 2021

Other than the point in the comment above about the aggregator interface, this design looks good to me.

There's one potential issue I wanted to bring up: right now, there's other sources of overhead that the estimation doesn't capture. It's mostly fine because the current estimation is an overestimate, so it "covers" in a sense for other things that are not explicitly estimated. So when we make these estimates more accurate we may find we need to add estimation for more kinds of things.

One example is the building of bitmaps during persist, which IIRC happens on heap. I've noticed in the past that this can have substantial footprint too. You should be able to repro a case where the footprint is big by having a multi value column that has 100+ values per row. I was working with a dataset like that when I first noticed this bitmap thing. For this particular one, maybe we can set a max dictionary size?

@kfaraz
Copy link
Contributor Author

kfaraz commented Dec 4, 2021

Thanks a lot for the feedback, @gianm !

One example is the building of bitmaps during persist, which IIRC happens on heap. I've noticed in the past that this can have substantial footprint too.

Thanks for calling this out, I will try to find other such sources of overhead that are not being estimated right now.

@cheddar
Copy link
Contributor

cheddar commented Dec 4, 2021

Workaround (Rejected):
To retain compatibilty, we could add a new default long aggregateAndEstimateMemory() method and leave the existing aggregate() method as is. The default implementation would return a negative value (say -1) in which case the OnHeapIncrementalIndex or any other estimator would use the max estimated size for that invocation.

This approach actually seems like a good idea to me for a few reasons:

* Aggregation is hot code, and there will be some overhead to size estimation, and query-time callers don't necessarily need it. So having two methods allows the caller to decide if it actually needs size estimates or not.

* We want to avoid incompatible changes to aggregation interfaces whenever possible.

While this proposal does introduce a pretty big backwards incompatibility and I hate introducing such compatibilities, doing this in the proposed backwards compatible way adds a branch after every single call to the aggregator which, given that this is a hot method, could have significant inadvertent impacts to query performance. Having the method be guaranteed to return a value that is always added minimizes that impact.

That said, with just the proposed interface, the conditional still moves into the implementation of the Aggregator itself for simple aggregators like a sum aggregator where it's a fixed memory size from initialization as the first call to aggregate() doubles as the "initialization" call.

Thinking about this gave me another idea for how to minimize the incompatibility:

We could add a method getInitializedMemorySize() which returns how much memory is consumed by just being initialized. For static-sized aggregators, this would essentially equate to returning the current size estimation from this one method and then always returning 0 from the aggregate() call.

Alternatively, instead of adding a method to the Aggregator interface, we could give AggregatorFactory an AggregatorFactory.factorizeWithSize() which returns a "holder" object that has both a reference to the Aggregator and its size. Given that AggregatorFactory is the thing that knows the intermediate size right now, this approach would actually allow for a default implementation that pushes the current size into the return object. If we then create a Aggregator.aggregateWithSize method, it could default to an implementation that calls aggregate and returns 0. Which now actually does make this a completely backwards compatible change. My only worry with that is that the most common implementation of Aggregator.aggregate would likely be via that default implementation and I'm not sure if this would be adding yet another virtual function call to the hot path.

The least-risk of impact approach is probably to add the factorizeWithSize() and still give aggregate a return type. This is backwards incompatible, but the vast majority of implementations just need to add a return 0; so, at least the change required is minimized.

Perhaps we try the fully backwards compatible approach, benchmark it. Adjust to the incompatible, non-default method, benchmark that and see if there's a marked difference between them?

@gianm
Copy link
Contributor

gianm commented Dec 4, 2021

To me this feels like a change that likely doesn't require incompatibility in order to perform well, for a couple specific reasons (below). So I do think we should do benchmarking before deciding to make things incompatible.

doing this in the proposed backwards compatible way adds a branch after every single call to the aggregator which, given that this is a hot method, could have significant inadvertent impacts to query performance

I was thinking that query-time callers would stick with void aggregate, so no branching would be needed on those paths. Currently the users of Aggregator are timeseries and topN, which have a natural limitation on the number of Aggregators they create anyway: timeseries only makes one set per time bucket and topN has its threshold. In practice I haven't noticed any issues with these query types running out of memory, so I don't think there is a reason to add the overhead of size estimation.

My only worry with that is that the most common implementation of Aggregator.aggregate would likely be via that default implementation and I'm not sure if this would be adding yet another virtual function call to the hot path.

This one seems like the kind of thing that the JVM should be able to optimize. Also, for the reason above, it's a code path that I'd only expect to get called during ingestion anyway, where aggregation time is not as big a driver of overall performance as it is during queries.

@kfaraz
Copy link
Contributor Author

kfaraz commented Dec 6, 2021

Based on the discussion above, we will modify the aggregator interfaces as follows:

Interface AggregatorFactory will get a new factorizeWithSize() method which returns a structure containing both the Aggregator instance as well as the initial memory size.

Interface Aggregator will get a new aggregateWithSize() method which returns a long representing the incremental memory used in that invocation of aggregateWithSize(). The default impl of this method would call aggregate() and return 0. Aggregators such as sum can rely on the default impl itself, thus always returning 0 from aggregateWithSize() effectively making the aggregator size same as the initial size returned from AggregatorFactory.factorizeWithSize()

In the first iteration, we would put the new estimation logic behind a feature flag. After some more testing, the flag can be removed altogether.

The following metrics will also be added (if they don't already exist):

  • number of incremental persists
  • num rows/num bytes in each persist
  • time taken to fill up buffer
  • time taken to persist buffer

kfaraz added a commit that referenced this issue Feb 3, 2022
Fixes #12022  

### Description
The current implementations of memory estimation in `OnHeapIncrementalIndex` and `StringDimensionIndexer` tend to over-estimate which leads to more persistence cycles than necessary.

This PR replaces the max estimation mechanism with getting the incremental memory used by the aggregator or indexer at each invocation of `aggregate` or `encode` respectively.

### Changes
- Add new flag `useMaxMemoryEstimates` in the task context. This overrides the same flag in DefaultTaskConfig i.e. `druid.indexer.task.default.context` map
- Add method `AggregatorFactory.factorizeWithSize()` that returns an `AggregatorAndSize` which contains
  the aggregator instance and the estimated initial size of the aggregator
- Add method `Aggregator.aggregateWithSize()` which returns the incremental memory used by this aggregation step
- Update the method `DimensionIndexer.processRowValsToKeyComponent()` to return the encoded key component as well as its effective size in bytes
- Update `OnHeapIncrementalIndex` to use the new estimations only if `useMaxMemoryEstimates = false`
kfaraz added a commit that referenced this issue May 13, 2022
Changes:
- Use apache datasketches version 3.2.0.
- Remove unsafe reflection-based usage of datasketch internals added in #12022
kfaraz added a commit to kfaraz/druid that referenced this issue May 16, 2022
Changes:
- Use apache datasketches version 3.2.0.
- Remove unsafe reflection-based usage of datasketch internals added in apache#12022
abhishekagarwal87 pushed a commit that referenced this issue May 17, 2022
Changes:
- Use apache datasketches version 3.2.0.
- Remove unsafe reflection-based usage of datasketch internals added in #12022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants