Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming aggregation as the last step of ConcurrentGrouper if data are spilled #4704

Merged
merged 50 commits into from
Oct 18, 2017

Conversation

jihoonson
Copy link
Contributor

@jihoonson jihoonson commented Aug 19, 2017

ConcurrentGrouper has two modes. Here is the doc from ConcurrentGrouper.

As long as the result set fits in memory, keys are partitioned between buffers based on their hash, and multiple threads can write into the same buffer. When it becomes clear that the result set does not fit in memory, the table switches to a mode where each thread gets its own buffer and its own spill files on disk.

Once the data are spilled on disk, records having the same grouping keys can be stored in different groupers and further aggregation is needed. Currently, this is combined by the single-threaded ResultMergeQueryRunner, but it can be a problem if there are too many records to be combined.

In this patch, I added a combining tree at the last of ConcurrentGrouper which does streaming combine of records using multiple threads. Here is the benchmark result.

master

Benchmark                                              (defaultStrategy)  (initialBuckets)  (numProcessingThreads)  (numSegments)  (queryGranularity)  (rowsPerSegment)  (schemaAndQuery)  Mode  Cnt       Score      Error  Units
GroupByBenchmark.queryMultiQueryableIndexWithSpilling                 v2                -1                       8              8                 all            100000           basic.A  avgt   30  494360.446 ± 7685.195  us/op

patch

Benchmark                                              (defaultStrategy)  (initialBuckets)  (numProcessingThreads)  (numSegments)  (queryGranularity)  (rowsPerSegment)  (schemaAndQuery)  Mode  Cnt       Score      Error  Units
GroupByBenchmark.queryMultiQueryableIndexWithSpilling                 v2                -1                       8              8                 all            100000           basic.A  avgt   30  306363.134 ± 4857.398  us/op

This change is Reviewable

@@ -250,6 +250,7 @@ When using the "v2" strategy, the following query context parameters apply:
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|
|`forcePushDownLimit`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|
|`forceHashAggregation`|Force to use hash-based aggregation.|
|`forceSingleThreadedCombine`|Force to use single-threaded combine.|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really needed to have this config? Do you imagine anybody will turn it on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's needed because streaming aggregation is not possible once the dictionary gets full (https://github.com/druid-io/druid/pull/4704/files#diff-8bb57c81d0c016c3203d4b3af9b741a4R50). Maybe we can add multi-threaded combining with spilling later if it's worthwhile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov I removed this in the latest patch. Now, Druid can decide to choose the combining strategy by itself. It first merges the dictionaries generated by underlying SpillingGroupers and check the size of the merged dictionary exceeds a given threshold. If it's not, parallel combining strategy is chosen.

@jihoonson jihoonson changed the title Add streaming aggregation as the last step of ConcurrentGrouper if data are spilled [WIP] Add streaming aggregation as the last step of ConcurrentGrouper if data are spilled Aug 24, 2017
@jihoonson
Copy link
Contributor Author

jihoonson commented Aug 24, 2017

I found parallel combining can make too many dictionaries which encompass two additional overheads, i.e., duplicated dictionary generation at runtime and too large dictionary space. I changed the title to fix this problem.

@jihoonson jihoonson changed the title [WIP] Add streaming aggregation as the last step of ConcurrentGrouper if data are spilled Add streaming aggregation as the last step of ConcurrentGrouper if data are spilled Aug 25, 2017
@jihoonson
Copy link
Contributor Author

Now ready for review.

@jihoonson
Copy link
Contributor Author

In the latest patch, I changed to merge dictionaries generated by underlying SpillingGroupers and let StreamingMergeSortedGroupers use the same merged dictionary without runtime generation.
Here is the benchmark result for using merged dictionary. Another 10% of performance gain.

Benchmark                                              (defaultStrategy)  (initialBuckets)  (numProcessingThreads)  (numSegments)  (queryGranularity)  (rowsPerSegment)  (schemaAndQuery)  Mode  Cnt       Score      Error  Units
GroupByBenchmark.queryMultiQueryableIndexWithSpilling                 v2                -1                       8              8                 all            100000           basic.A  avgt   30  272800.159 ± 2655.104  us/op

Copy link
Member

@leventov leventov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a question about ConcurrentGrouper: couldn't it complex parallel hierarical logic more elegantly expressed with ForkJoinPool?

@@ -221,9 +244,7 @@ public void reset()
}

for (Grouper<KeyType> grouper : groupers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May replace with forEach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

parallelSortAndGetGroupersIterator() :
getGroupersIterator(sorted);

// Parallel combine is used only when data are spilled. This is because ConcurrentGrouper uses two different modes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"is" spilled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Fixed.

getGroupersIterator(sorted);

// Parallel combine is used only when data are spilled. This is because ConcurrentGrouper uses two different modes
// depending on data are spilled or not. If data is not spilled, all inputs are completely aggregated and no more
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if (sorted && spilling && isParallelizable()) {
// First try to merge dictionaries generated by all underlying groupers. If it is merged successfully, the same
// merged dictionary is used for all combining threads
final List<String> mergedDictionary = mergeDictionary();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge/merged -- choose one

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like "tryCreateMergedDictionary" or "tryMergeDictionary"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to tryMergeDictionary

return parallelCombine(sortedIterators, mergedDictionary);
}
}
return Groupers.mergeIterators(sortedIterators, sorted ? keyObjComparator : null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function Groupers.mergeIterators() shouldn't act as "concat" if the provided comparator is null, it's very confusing. It should accept only non-null comparator, and at the call sites there should be if-else (e. g. here) with "concat" in the other branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Refactored Groupers.mergeIterators().

settableColumnSelectorFactory,
combiningFactories
);
grouper.init();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not calling this inside lambda

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved.

Copy link
Contributor Author

@jihoonson jihoonson Sep 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called outside because it initialize some variables like curWriteIndex, nextReadIndex, and finished which should be done before calling iterator(). I reverted this change.

*
* @return an iterator of the root of the combining tree
*/
private Iterator<Entry<KeyType>> buildCombineTree(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to return a pair of Iterator and List<Future>, then provide List<Future> "to fill"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

* @return a pair of degree and number of buffers if found. Otherwise null.
*/
@Nullable
private static Pair<Integer, Integer> findCombineDegreeAndNumBuffers(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this method and computeRequiredBufferNum() many times and still don't understand their meaning. It needs some simplification/better explanation. Also "degree" everywhere over here is unclear, what does that actually mean?

Copy link
Contributor Author

@jihoonson jihoonson Sep 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parallelCombine() method builds a combining tree which asynchronously combine input entries. Each node in the tree is a combining task which iterates through child iterators , aggregates the inputs from those iterators, and returns an iterator for the result of aggregation. The "degree" here means the number of children of a tree node. More precisely, it means how many child groupers are aggregated by a parent grouper.

This method finds the degree of the combining tree and the required number of buffers maximizing the degree of parallelism. This is needed because we want to maximize the parallelism while the size of buffer slice is greater than the minimum buffer size required by StreamingMergeSortedGrouper.

I extracted parallelCombine() as a class and added more comments to this and related methods.


return grouper;
}

@Override
public void close()
{
closed = true;
for (Grouper<KeyType> grouper : groupers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use forEach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.


return grouper;
}

@Override
public void close()
{
closed = true;
for (Grouper<KeyType> grouper : groupers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not close groupers if already closed = true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added.

@jihoonson
Copy link
Contributor Author

Also a question about ConcurrentGrouper: couldn't it complex parallel hierarical logic more elegantly expressed with ForkJoinPool?

There are two reasons I didn't use F/J. First, the executor used in ConcurrentGrouper (and now in ParallelCombiner as well) is a sort of fixed-size thread pool executor and the pool size is configured by users (DruidProcessingConfig). To use F/J, we need another configuration which should be avoided if possible.
Another reason is that there are some debates on how good F/J is (http://www.coopsoft.com/ar/CalamityArticle.html). I was not sure F/J is good for this kind of work.

public class StreamingMergeSortedGrouper<KeyType> implements Grouper<KeyType>
{
private static final Logger LOG = new Logger(StreamingMergeSortedGrouper.class);
private static final long DEFAULT_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(5); // default timeout for spinlock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not actually for "spinlock", but in general

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it should be the timeout for spinlock. Fixed it. Thanks.

// We additionally check that nextReadIndex is -1 here because the writing thread should wait for the reading
// thread to start reading only when the writing thread tries to overwrite the first slot for the first time.
waitFor(
notUsed -> (nextReadIndex == -1 || nextReadIndex == 0) && !Thread.currentThread().isInterrupted(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!Thread.currentThread().isInterrupted() is a part of all conditions, better moved to waitFor() method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that a new lambda object is not created on each call? But if a lambda is cached in a field, it will harm readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I could see some performance degradation with using lambda. Reverted this change.

* Wait while the given predicate is true. Throws an exception the current nano time is greater than the given
* timeout.
*/
private static void waitFor(Predicate<Void> predicate, long queryTimeoutAtNs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better create nested private interface called Predicate or Condition with a single no-arg boolean-returning method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this method.

*/
private static void waitFor(Predicate<Void> predicate, long queryTimeoutAtNs)
{
final long startAt = System.nanoTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this method, inconsistency in using -Ns suffix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this method.

@gianm
Copy link
Contributor

gianm commented Oct 3, 2017

@jihoonson When I run the tests on this branch locally through mvn test on my Mac, I get this error:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.19.1:test (default-test) on project druid-processing: Execution default-test of goal org.apache.maven.plugins:maven-surefire-plugin:2.19.1:test failed: java.lang.RuntimeException: java.io.FileNotFoundException: /Users/gian/Documents/imply/src/druid/processing/target/surefire-reports/io.druid.query.groupby.epinephelinae.ConcurrentGrouperTest-output.txt (Too many open files in system) -> [Help 1]

My max files setting is already pretty large:

$ sysctl kern.maxfiles
kern.maxfiles: 12288

Is the ConcurrentGrouperTest doing something that opens a really big number of files?

Fwiw, the tests pass after raising the setting:

$ sudo sysctl -w kern.maxfiles=20480
kern.maxfiles: 12288 -> 20480

@jihoonson
Copy link
Contributor Author

@gianm thank you for the testing. You're correct. ConcurrentGrouperTest generates too many files. Fixed now.

@jihoonson
Copy link
Contributor Author

@gianm do you have more comments?

@gianm
Copy link
Contributor

gianm commented Oct 12, 2017

@jihoonson I asked @jon-wei to take a look at this tomorrow and then we should be good. I probably won't review it closely myself. The only question I have is a high level one. It used to be that the combining of spill files happened in an http thread, but now it will happen in the processing thread pool. What happens if they generate merged rows faster than the http thread can send them out to the client?

Will they queue up infinitely in memory and will the historical run OOM? Or will they block processing threads and prevent other queries from running? Or something else?

This kind of thing is just on my mind since #4933 recently came up for a similar issue on the broker, and I'm wondering if it can happen here.

@jihoonson
Copy link
Contributor Author

jihoonson commented Oct 12, 2017

It used to be that the combining of spill files happened in an http thread, but now it will happen in the processing thread pool. What happens if they generate merged rows faster than the http thread can send them out to the client?

Each processing thread has a merge buffer of a fixed size. Once the buffer is full, they are blocked until some buffer space becomes available for further merging. StreamingMergeSortedGrouper respects the query timeout, so if the http thread consumes the merged rows very slowly, that query will be failed with a timeout exception. However, it can still block other queries until it's expired and this might be a problem.

So, I talked with @gianm and added a new configuration for the concurrency hint for parallel combining. The default value of the configuration is 1 which turns off the parallel combining feature.

@@ -220,7 +220,8 @@ When using the "v2" strategy, the following runtime properties apply:
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false|
|`druid.query.groupBy.forceHashAggregation`|Force to use hash-based aggregation.|false|
|`druid.query.groupBy.intermediateCombineDegree`|The number of intermediate nodes combined together in the combining tree. Higher degrees will need a less numer of threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8|
|`druid.query.groupBy.intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need a less numer of threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8|
|`druid.query.groupBy.numParallelCombineThreads`|Hint for the number of parallel combining threads. This should be larger than 1 to turn on the parallel combining feature. The actual number of threads used for parallel combining is min(`druid.query.groupBy.numParallelCombineThreads`, `druid.processing.numThreads`).|1|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those configs are impossible to get right for an average user. Either they should be simpler, or descriptions should be much more elaborate, explaining in what conditions user would want to change the values, and to what particular values

Copy link
Contributor

@gianm gianm Oct 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this patch, I think more elaborate descriptions are the way to go, since I bet that making them simpler would imply some major work on the query engine. So I think we should do some descriptions that let users know when they might want to change these properties from the default values.

Additionally, since we have a lot of groupBy settings now, I'd like to organize them into "commonly tuned configs" and "advanced configs, not commonly changed". This will help users focus on the tunings that have the most impact on performance. IMO the commonly tuned configs would be:

  • druid.processing.numMergeBuffers (tuned to trade off concurrent query capability vs. memory footprint)
  • druid.query.groupBy.maxMergingDictionarySize (if you do large groupBys, this should be tuned to fit well with druid.processing.numMergeBuffers and your JVM heap setting. if you don't do large groupBys, it doesn't matter much)
  • druid.query.groupBy.maxOnDiskStorage (if you do large groupBys, this should be tuned to fit your needs to use disk for aggregation)

Hopefully, the rest of the parameters are not commonly tuned. I'm not sure if the new ones would be in that category or not. @jihoonson / @leventov what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'll improve the document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added new sections 'Performance tuning for groupBy v2', 'commonly tuned configurations', and 'advanced configurations'.
You can see in a rendered form here.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments on the new documentation. Other than that it looks great. Thanks for rewriting it.

|`maxOnDiskStorage`|`maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|


##### Configurations for groupBy v1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move groupBy v1 configs outside of the 'common' section, it has been non-default since 0.10.0 and probably not commonly deployed anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved.


|Server runtime property (prefixed by `druid.query.groupBy.`)|Query context parameter|Description|Default|
|-----------------------|-----------------------|-----------|-------|
|`maxMergingDictionarySize`|`maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it's better to put the full runtime property name here, since it helps users if they want to copy-and-paste it into their config files. And often people skim a doc rather than read every word, and so it's easy to miss the header that says "prefix must be druid.query.groupBy.".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that should be better for copy-and-paste. Reverted some changes.


|Server runtime property (prefixed by `druid.query.groupBy.`)|Query context parameter|Description|Default|
|-----------------------|-----------------------|-----------|-------|
|`maxMergingDictionarySize`|`maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should mention that the context parameter can only lower the amount of memory, not raise it. Same for maxOnDiskStorage. It's important for users that may wonder why setting the context value doesn't do anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Added.

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing my review to comment instead of approve since I have not looked at the patch closely. The general idea looks good to me though.

Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a design-level review, LGTM.

Had a couple of comments about exception messages.

I'll merge after fixing the conflict and addressing @gianm's comments on docs.

}
}

throw new ISE("Cannot find a proper leaf combine degree. Try increasing druid.processing.buffer.sizeBytes.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs to be addressed in this specific PR, but maybe it would be nice to have more information in the exception message here that guides users towards what change they need to make to druid.processing.buffer.sizeBytes. Maybe if it calculated the bounds of sizeBytes needed to support MINIMUM_LEAF_COMBINE_DEGREE and the highest leafCombineDegree where requiredBufferNum is still less than numAvailableThreads, or something of that nature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds nice. I added some description.

public void finish()
{
increaseWriteIndex();
// Once finished is set, curWirteIndex must not be changed. This guarantees that the remaining number of items in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curWirteIndex -> curWriteIndex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

* and must not be called by multiple threads concurrently.
* Some implementations allow writes even after this method is called. After you are done with the iterator
* returned by this method, you should either call {@link #close()} (if you are done with the Grouper) or
* {@link #reset()} (if you want to reuse it). Some implmenetations allow calling {@link #iterator(boolean)} again if
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implmenetations -> implementations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

if (i < numBuffers) {
return Groupers.getSlice(combineBuffer, sliceSize, i++);
} else {
throw new ISE("Requested number of buffer slices exceeds the planned one");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add the number of requested buffer slices and the planned number for better debugging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@@ -179,14 +180,42 @@ disk space.

With groupBy v2, cluster operators should make sure that the off-heap hash tables and on-heap merging dictionaries
will not exceed available memory for the maximum possible concurrent query load (given by
druid.processing.numMergeBuffers).
druid.processing.numMergeBuffers). Especially the amount of direct memory needed by Druid is at least
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall there is a dedicated section about this in docs, maybe refer to it instead of copying

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, this is in docs/content/operations/performance-faq

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Changed.


The groupBy v2 engine uses an open addressing hash table for aggregation. The hash table is initalized with a given initial bucket number and gradually grows on buffer full. On hash collisions, the linear probing technique is used.

The default number of initial buckets is 1024 and the default max load factor of the hash table is 0.7. If you can see too many collisions in the hash table, you can adjust these numbers. See `bufferGrouperInitialBuckets` and `bufferGrouperMaxLoadFactor` in [Advanced groupBy v2 configurations](#groupby-v2-configurations).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How a user could "see too many collisions"?

Copy link
Contributor Author

@jihoonson jihoonson Oct 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is not easy for users. I think currently the only possible way is method-level profiling. I think we need to provide some metrics for hash table collisions and growths.


##### Parallel combine

Once a historical finishes aggregation using the hash table, it sorts data before sending to the broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads (configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting which is an http thread to send data to brokers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"By default, historicals use all their available processing threads (configured by druid.processing.numThreads) for aggregation" -- means, each groupBy query occupies all processing threads?

BTW, please break lines in docs, it's not convenient to comment specific thing, and also to look in IDE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each groupBy query occupies all processing threads?

Yes, currently it takes all processing threads.


Once a historical finishes aggregation using the hash table, it sorts data before sending to the broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads (configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting which is an http thread to send data to brokers.

This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take longer time than timeseries or topN queries, they should release processing threads as soon as possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"they should release processing threads as soon as possible" -- "should" quite doesn't make sense to me here. "GroupBy queries" are not "proactive" neither they could affect when do they release threads

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "neither they could affect when do they release threads"?


This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take longer time than timeseries or topN queries, they should release processing threads as soon as possible.

However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck of heavy groupBy queries is the sorting. In such cases, you can use processing threads for sorting as well. This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled for only when data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Note that parallel combine can be enabled for only when data is actually spilled"

  • Remove "for"?
  • data spilling concept is not introduced yet, making this sentence non-sense for a fresh reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed "for".
Data spilling is explained in the "Memory tuning and resource limits" section linked here. Do you think it's not enough?


However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck of heavy groupBy queries is the sorting. In such cases, you can use processing threads for sorting as well. This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled for only when data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)).

Once parallel combine is enabled, the groupBy v2 engine creates a combining tree. Each intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they need to read data from disk. As a result, a less number of threads are used for intermediate nodes by default. You can change the degree of intermeidate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Once parallel combine is enabled, the groupBy v2 engine creates a combining tree" -- makes impression that parallel combine is enabled right during the query execution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think better "less threads are used" instead of "A less number of threads"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Fixed.


|Server runtime property (prefixed by `druid.query.groupBy.`)|Query context parameter|Description|Default|
|-----------------------|-----------------------|-----------|-------|
|`bufferGrouperInitialBuckets`|`bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "reasonable default" here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to doc.

|Server runtime property (prefixed by `druid.query.groupBy.`)|Query context parameter|Description|Default|
|-----------------------|-----------------------|-----------|-------|
|`bufferGrouperInitialBuckets`|`bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0|
|`bufferGrouperMaxLoadFactor`|`bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to doc.

|`bufferGrouperInitialBuckets`|`bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0|
|`bufferGrouperMaxLoadFactor`|`bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0|
|`forceHashAggregation`|`forceHashAggregation`|Force to use hash-based aggregation.|false|
|`intermediateCombineDegree`|`intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need a less numer of threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"less threads"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

|`bufferGrouperMaxLoadFactor`|`bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0|
|`forceHashAggregation`|`forceHashAggregation`|Force to use hash-based aggregation.|false|
|`intermediateCombineDegree`|`intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need a less numer of threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8|
|`numParallelCombineThreads`|`numParallelCombineThreads`|Hint for the number of parallel combining threads. This should be larger than 1 to turn on the parallel combining feature. The actual number of threads used for parallel combining is min(`druid.query.groupBy.numParallelCombineThreads`, `druid.processing.numThreads`).|1 (disabled)|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The actual number of threads used for parallel combining is min(druid.query.groupBy.numParallelCombineThreads, druid.processing.numThreads)" -- unless the value is 1? IMO too complicated way to set property. Maybe add a separate boolean property, and require numParallelCombineThreads > 1, if configured.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, adding another property to turn on parallel combining looks more complicated to me.
I added a check that druid.query.groupBy.numParallelCombineThreads cannot be larger than druid.processing.numThreads instead of calculating min of them.

Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jon-wei
Copy link
Contributor

jon-wei commented Oct 17, 2017

@leventov did you have any more comments for this PR?

public ResourceHolder<ByteBuffer> get()
{
if (!initialized) {
buffer = processingBufferPool.take();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this changes an assumption made at historicals and realtime task processes that processing buffer pool would only be asked for buffers from processing thread (one per thread) ... that would keep the usage of processing buffer pool restricted and no processing thread would need to block to get a buffer.
( came across this while doing #5345 )

I think that this buffer should really come from merge pool. what do you think? @gianm @jihoonson

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should definitely not be from the processing pool. The merge pool might be the right place, however if so, this should be combined with the other code in the same file that pulls from the merge pool in order to make it atomic. Otherwise we could get deadlocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I had a reason for using the processing pool when I wrote this code, but I no longer remember it. Also, it looks to make much more sense to use the blocking pool now. As @gianm said, the buffer should be acquired atomically at the beginning of the query processing in historicals and realtime tasks.
@himanshug do you have a plan to make a patch for this? or I'll make it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson please feel free to do the patch. thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson nvmd , let me fix it in #5345 . its a small change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@himanshug thanks. I'll review that PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants