-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add streaming aggregation as the last step of ConcurrentGrouper if data are spilled #4704
Conversation
@@ -250,6 +250,7 @@ When using the "v2" strategy, the following query context parameters apply: | |||
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.| | |||
|`forcePushDownLimit`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.| | |||
|`forceHashAggregation`|Force to use hash-based aggregation.| | |||
|`forceSingleThreadedCombine`|Force to use single-threaded combine.| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really needed to have this config? Do you imagine anybody will turn it on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's needed because streaming aggregation is not possible once the dictionary gets full (https://github.com/druid-io/druid/pull/4704/files#diff-8bb57c81d0c016c3203d4b3af9b741a4R50). Maybe we can add multi-threaded combining with spilling later if it's worthwhile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leventov I removed this in the latest patch. Now, Druid can decide to choose the combining strategy by itself. It first merges the dictionaries generated by underlying SpillingGroupers and check the size of the merged dictionary exceeds a given threshold. If it's not, parallel combining strategy is chosen.
I found parallel combining can make too many dictionaries which encompass two additional overheads, i.e., duplicated dictionary generation at runtime and too large dictionary space. I changed the title to fix this problem. |
Now ready for review. |
In the latest patch, I changed to merge dictionaries generated by underlying SpillingGroupers and let StreamingMergeSortedGroupers use the same merged dictionary without runtime generation.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also a question about ConcurrentGrouper: couldn't it complex parallel hierarical logic more elegantly expressed with ForkJoinPool?
@@ -221,9 +244,7 @@ public void reset() | |||
} | |||
|
|||
for (Grouper<KeyType> grouper : groupers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May replace with forEach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
parallelSortAndGetGroupersIterator() : | ||
getGroupersIterator(sorted); | ||
|
||
// Parallel combine is used only when data are spilled. This is because ConcurrentGrouper uses two different modes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"is" spilled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Fixed.
getGroupersIterator(sorted); | ||
|
||
// Parallel combine is used only when data are spilled. This is because ConcurrentGrouper uses two different modes | ||
// depending on data are spilled or not. If data is not spilled, all inputs are completely aggregated and no more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
if (sorted && spilling && isParallelizable()) { | ||
// First try to merge dictionaries generated by all underlying groupers. If it is merged successfully, the same | ||
// merged dictionary is used for all combining threads | ||
final List<String> mergedDictionary = mergeDictionary(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge/merged -- choose one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like "tryCreateMergedDictionary" or "tryMergeDictionary"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to tryMergeDictionary
return parallelCombine(sortedIterators, mergedDictionary); | ||
} | ||
} | ||
return Groupers.mergeIterators(sortedIterators, sorted ? keyObjComparator : null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function Groupers.mergeIterators()
shouldn't act as "concat" if the provided comparator is null, it's very confusing. It should accept only non-null comparator, and at the call sites there should be if-else (e. g. here) with "concat" in the other branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Refactored Groupers.mergeIterators().
settableColumnSelectorFactory, | ||
combiningFactories | ||
); | ||
grouper.init(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not calling this inside lambda
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be called outside because it initialize some variables like curWriteIndex, nextReadIndex, and finished which should be done before calling iterator(). I reverted this change.
* | ||
* @return an iterator of the root of the combining tree | ||
*/ | ||
private Iterator<Entry<KeyType>> buildCombineTree( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to return a pair of Iterator
and List<Future>
, then provide List<Future>
"to fill"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
* @return a pair of degree and number of buffers if found. Otherwise null. | ||
*/ | ||
@Nullable | ||
private static Pair<Integer, Integer> findCombineDegreeAndNumBuffers( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read this method and computeRequiredBufferNum()
many times and still don't understand their meaning. It needs some simplification/better explanation. Also "degree" everywhere over here is unclear, what does that actually mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallelCombine()
method builds a combining tree which asynchronously combine input entries. Each node in the tree is a combining task which iterates through child iterators , aggregates the inputs from those iterators, and returns an iterator for the result of aggregation. The "degree" here means the number of children of a tree node. More precisely, it means how many child groupers are aggregated by a parent grouper.
This method finds the degree of the combining tree and the required number of buffers maximizing the degree of parallelism. This is needed because we want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size required by StreamingMergeSortedGrouper
.
I extracted parallelCombine() as a class and added more comments to this and related methods.
|
||
return grouper; | ||
} | ||
|
||
@Override | ||
public void close() | ||
{ | ||
closed = true; | ||
for (Grouper<KeyType> grouper : groupers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use forEach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
|
||
return grouper; | ||
} | ||
|
||
@Override | ||
public void close() | ||
{ | ||
closed = true; | ||
for (Grouper<KeyType> grouper : groupers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not close groupers if already closed = true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Added.
There are two reasons I didn't use F/J. First, the executor used in ConcurrentGrouper (and now in ParallelCombiner as well) is a sort of fixed-size thread pool executor and the pool size is configured by users (DruidProcessingConfig). To use F/J, we need another configuration which should be avoided if possible. |
public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType> | ||
{ | ||
private static final Logger LOG = new Logger(StreamingMergeSortedGrouper.class); | ||
private static final long DEFAULT_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(5); // default timeout for spinlock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not actually for "spinlock", but in general
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it should be the timeout for spinlock. Fixed it. Thanks.
// We additionally check that nextReadIndex is -1 here because the writing thread should wait for the reading | ||
// thread to start reading only when the writing thread tries to overwrite the first slot for the first time. | ||
waitFor( | ||
notUsed -> (nextReadIndex == -1 || nextReadIndex == 0) && !Thread.currentThread().isInterrupted(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!Thread.currentThread().isInterrupted()
is a part of all conditions, better moved to waitFor()
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure that a new lambda object is not created on each call? But if a lambda is cached in a field, it will harm readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I could see some performance degradation with using lambda. Reverted this change.
* Wait while the given predicate is true. Throws an exception the current nano time is greater than the given | ||
* timeout. | ||
*/ | ||
private static void waitFor(Predicate<Void> predicate, long queryTimeoutAtNs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better create nested private interface called Predicate
or Condition
with a single no-arg boolean-returning method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this method.
*/ | ||
private static void waitFor(Predicate<Void> predicate, long queryTimeoutAtNs) | ||
{ | ||
final long startAt = System.nanoTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this method, inconsistency in using -Ns
suffix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this method.
@jihoonson When I run the tests on this branch locally through
My max files setting is already pretty large:
Is the ConcurrentGrouperTest doing something that opens a really big number of files? Fwiw, the tests pass after raising the setting:
|
@gianm thank you for the testing. You're correct. ConcurrentGrouperTest generates too many files. Fixed now. |
@gianm do you have more comments? |
@jihoonson I asked @jon-wei to take a look at this tomorrow and then we should be good. I probably won't review it closely myself. The only question I have is a high level one. It used to be that the combining of spill files happened in an http thread, but now it will happen in the processing thread pool. What happens if they generate merged rows faster than the http thread can send them out to the client? Will they queue up infinitely in memory and will the historical run OOM? Or will they block processing threads and prevent other queries from running? Or something else? This kind of thing is just on my mind since #4933 recently came up for a similar issue on the broker, and I'm wondering if it can happen here. |
Each processing thread has a merge buffer of a fixed size. Once the buffer is full, they are blocked until some buffer space becomes available for further merging. StreamingMergeSortedGrouper respects the query timeout, so if the http thread consumes the merged rows very slowly, that query will be failed with a timeout exception. However, it can still block other queries until it's expired and this might be a problem. So, I talked with @gianm and added a new configuration for the concurrency hint for parallel combining. The default value of the configuration is 1 which turns off the parallel combining feature. |
@@ -220,7 +220,8 @@ When using the "v2" strategy, the following runtime properties apply: | |||
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)| | |||
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false| | |||
|`druid.query.groupBy.forceHashAggregation`|Force to use hash-based aggregation.|false| | |||
|`druid.query.groupBy.intermediateCombineDegree`|The number of intermediate nodes combined together in the combining tree. Higher degrees will need a less numer of threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8| | |||
|`druid.query.groupBy.intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need a less numer of threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8| | |||
|`druid.query.groupBy.numParallelCombineThreads`|Hint for the number of parallel combining threads. This should be larger than 1 to turn on the parallel combining feature. The actual number of threads used for parallel combining is min(`druid.query.groupBy.numParallelCombineThreads`, `druid.processing.numThreads`).|1| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those configs are impossible to get right for an average user. Either they should be simpler, or descriptions should be much more elaborate, explaining in what conditions user would want to change the values, and to what particular values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this patch, I think more elaborate descriptions are the way to go, since I bet that making them simpler would imply some major work on the query engine. So I think we should do some descriptions that let users know when they might want to change these properties from the default values.
Additionally, since we have a lot of groupBy settings now, I'd like to organize them into "commonly tuned configs" and "advanced configs, not commonly changed". This will help users focus on the tunings that have the most impact on performance. IMO the commonly tuned configs would be:
- druid.processing.numMergeBuffers (tuned to trade off concurrent query capability vs. memory footprint)
- druid.query.groupBy.maxMergingDictionarySize (if you do large groupBys, this should be tuned to fit well with druid.processing.numMergeBuffers and your JVM heap setting. if you don't do large groupBys, it doesn't matter much)
- druid.query.groupBy.maxOnDiskStorage (if you do large groupBys, this should be tuned to fit your needs to use disk for aggregation)
Hopefully, the rest of the parameters are not commonly tuned. I'm not sure if the new ones would be in that category or not. @jihoonson / @leventov what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I'll improve the document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added new sections 'Performance tuning for groupBy v2', 'commonly tuned configurations', and 'advanced configurations'.
You can see in a rendered form here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments on the new documentation. Other than that it looks great. Thanks for rewriting it.
|`maxOnDiskStorage`|`maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)| | ||
|
||
|
||
##### Configurations for groupBy v1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move groupBy v1 configs outside of the 'common' section, it has been non-default since 0.10.0 and probably not commonly deployed anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved.
|
||
|Server runtime property (prefixed by `druid.query.groupBy.`)|Query context parameter|Description|Default| | ||
|-----------------------|-----------------------|-----------|-------| | ||
|`maxMergingDictionarySize`|`maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, it's better to put the full runtime property name here, since it helps users if they want to copy-and-paste it into their config files. And often people skim a doc rather than read every word, and so it's easy to miss the header that says "prefix must be druid.query.groupBy.".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that should be better for copy-and-paste. Reverted some changes.
|
||
|Server runtime property (prefixed by `druid.query.groupBy.`)|Query context parameter|Description|Default| | ||
|-----------------------|-----------------------|-----------|-------| | ||
|`maxMergingDictionarySize`|`maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should mention that the context parameter can only lower the amount of memory, not raise it. Same for maxOnDiskStorage. It's important for users that may wonder why setting the context value doesn't do anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing my review to comment instead of approve since I have not looked at the patch closely. The general idea looks good to me though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a design-level review, LGTM.
Had a couple of comments about exception messages.
I'll merge after fixing the conflict and addressing @gianm's comments on docs.
} | ||
} | ||
|
||
throw new ISE("Cannot find a proper leaf combine degree. Try increasing druid.processing.buffer.sizeBytes."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it needs to be addressed in this specific PR, but maybe it would be nice to have more information in the exception message here that guides users towards what change they need to make to druid.processing.buffer.sizeBytes. Maybe if it calculated the bounds of sizeBytes needed to support MINIMUM_LEAF_COMBINE_DEGREE and the highest leafCombineDegree where requiredBufferNum is still less than numAvailableThreads, or something of that nature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds nice. I added some description.
public void finish() | ||
{ | ||
increaseWriteIndex(); | ||
// Once finished is set, curWirteIndex must not be changed. This guarantees that the remaining number of items in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curWirteIndex -> curWriteIndex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks.
* and must not be called by multiple threads concurrently. | ||
* Some implementations allow writes even after this method is called. After you are done with the iterator | ||
* returned by this method, you should either call {@link #close()} (if you are done with the Grouper) or | ||
* {@link #reset()} (if you want to reuse it). Some implmenetations allow calling {@link #iterator(boolean)} again if |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implmenetations -> implementations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks.
if (i < numBuffers) { | ||
return Groupers.getSlice(combineBuffer, sliceSize, i++); | ||
} else { | ||
throw new ISE("Requested number of buffer slices exceeds the planned one"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add the number of requested buffer slices and the planned number for better debugging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
@@ -179,14 +180,42 @@ disk space. | |||
|
|||
With groupBy v2, cluster operators should make sure that the off-heap hash tables and on-heap merging dictionaries | |||
will not exceed available memory for the maximum possible concurrent query load (given by | |||
druid.processing.numMergeBuffers). | |||
druid.processing.numMergeBuffers). Especially the amount of direct memory needed by Druid is at least |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall there is a dedicated section about this in docs, maybe refer to it instead of copying
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, this is in docs/content/operations/performance-faq
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Changed.
|
||
The groupBy v2 engine uses an open addressing hash table for aggregation. The hash table is initalized with a given initial bucket number and gradually grows on buffer full. On hash collisions, the linear probing technique is used. | ||
|
||
The default number of initial buckets is 1024 and the default max load factor of the hash table is 0.7. If you can see too many collisions in the hash table, you can adjust these numbers. See `bufferGrouperInitialBuckets` and `bufferGrouperMaxLoadFactor` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How a user could "see too many collisions"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this is not easy for users. I think currently the only possible way is method-level profiling. I think we need to provide some metrics for hash table collisions and growths.
|
||
##### Parallel combine | ||
|
||
Once a historical finishes aggregation using the hash table, it sorts data before sending to the broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads (configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting which is an http thread to send data to brokers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"By default, historicals use all their available processing threads (configured by druid.processing.numThreads
) for aggregation" -- means, each groupBy query occupies all processing threads?
BTW, please break lines in docs, it's not convenient to comment specific thing, and also to look in IDE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each groupBy query occupies all processing threads?
Yes, currently it takes all processing threads.
|
||
Once a historical finishes aggregation using the hash table, it sorts data before sending to the broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads (configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting which is an http thread to send data to brokers. | ||
|
||
This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take longer time than timeseries or topN queries, they should release processing threads as soon as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"they should release processing threads as soon as possible" -- "should" quite doesn't make sense to me here. "GroupBy queries" are not "proactive" neither they could affect when do they release threads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "neither they could affect when do they release threads"?
|
||
This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take longer time than timeseries or topN queries, they should release processing threads as soon as possible. | ||
|
||
However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck of heavy groupBy queries is the sorting. In such cases, you can use processing threads for sorting as well. This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled for only when data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Note that parallel combine can be enabled for only when data is actually spilled"
- Remove "for"?
- data spilling concept is not introduced yet, making this sentence non-sense for a fresh reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed "for".
Data spilling is explained in the "Memory tuning and resource limits" section linked here. Do you think it's not enough?
|
||
However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck of heavy groupBy queries is the sorting. In such cases, you can use processing threads for sorting as well. This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled for only when data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)). | ||
|
||
Once parallel combine is enabled, the groupBy v2 engine creates a combining tree. Each intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they need to read data from disk. As a result, a less number of threads are used for intermediate nodes by default. You can change the degree of intermeidate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Once parallel combine is enabled, the groupBy v2 engine creates a combining tree" -- makes impression that parallel combine is enabled right during the query execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think better "less threads are used" instead of "A less number of threads"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Fixed.
|
||
|Server runtime property (prefixed by `druid.query.groupBy.`)|Query context parameter|Description|Default| | ||
|-----------------------|-----------------------|-----------|-------| | ||
|`bufferGrouperInitialBuckets`|`bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is "reasonable default" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to doc.
|Server runtime property (prefixed by `druid.query.groupBy.`)|Query context parameter|Description|Default| | ||
|-----------------------|-----------------------|-----------|-------| | ||
|`bufferGrouperInitialBuckets`|`bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0| | ||
|`bufferGrouperMaxLoadFactor`|`bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to doc.
|`bufferGrouperInitialBuckets`|`bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0| | ||
|`bufferGrouperMaxLoadFactor`|`bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0| | ||
|`forceHashAggregation`|`forceHashAggregation`|Force to use hash-based aggregation.|false| | ||
|`intermediateCombineDegree`|`intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need a less numer of threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"less threads"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|`bufferGrouperMaxLoadFactor`|`bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0| | ||
|`forceHashAggregation`|`forceHashAggregation`|Force to use hash-based aggregation.|false| | ||
|`intermediateCombineDegree`|`intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need a less numer of threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8| | ||
|`numParallelCombineThreads`|`numParallelCombineThreads`|Hint for the number of parallel combining threads. This should be larger than 1 to turn on the parallel combining feature. The actual number of threads used for parallel combining is min(`druid.query.groupBy.numParallelCombineThreads`, `druid.processing.numThreads`).|1 (disabled)| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"The actual number of threads used for parallel combining is min(druid.query.groupBy.numParallelCombineThreads
, druid.processing.numThreads
)" -- unless the value is 1? IMO too complicated way to set property. Maybe add a separate boolean property, and require numParallelCombineThreads > 1, if configured.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, adding another property to turn on parallel combining looks more complicated to me.
I added a check that druid.query.groupBy.numParallelCombineThreads cannot be larger than druid.processing.numThreads instead of calculating min of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@leventov did you have any more comments for this PR? |
public ResourceHolder<ByteBuffer> get() | ||
{ | ||
if (!initialized) { | ||
buffer = processingBufferPool.take(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this changes an assumption made at historicals and realtime task processes that processing buffer pool would only be asked for buffers from processing thread (one per thread) ... that would keep the usage of processing buffer pool restricted and no processing thread would need to block to get a buffer.
( came across this while doing #5345 )
I think that this buffer should really come from merge pool. what do you think? @gianm @jihoonson
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should definitely not be from the processing pool. The merge pool might be the right place, however if so, this should be combined with the other code in the same file that pulls from the merge pool in order to make it atomic. Otherwise we could get deadlocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jihoonson what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. I had a reason for using the processing pool when I wrote this code, but I no longer remember it. Also, it looks to make much more sense to use the blocking pool now. As @gianm said, the buffer should be acquired atomically at the beginning of the query processing in historicals and realtime tasks.
@himanshug do you have a plan to make a patch for this? or I'll make it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jihoonson please feel free to do the patch. thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jihoonson nvmd , let me fix it in #5345 . its a small change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@himanshug thanks. I'll review that PR.
ConcurrentGrouper has two modes. Here is the doc from ConcurrentGrouper.
Once the data are spilled on disk, records having the same grouping keys can be stored in different groupers and further aggregation is needed. Currently, this is combined by the single-threaded
ResultMergeQueryRunner
, but it can be a problem if there are too many records to be combined.In this patch, I added a combining tree at the last of ConcurrentGrouper which does streaming combine of records using multiple threads. Here is the benchmark result.
This change is