Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming aggregation as the last step of ConcurrentGrouper if data are spilled #4704

Merged
merged 50 commits into from
Oct 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
0e68c08
Add steaming grouper
jihoonson Aug 19, 2017
30c0038
Fix doc
jihoonson Aug 19, 2017
c79959f
Use a single dictionary while combining
jihoonson Aug 25, 2017
21d42ce
Revert GroupByBenchmark
jihoonson Aug 25, 2017
701adb1
Removed unused code
jihoonson Aug 25, 2017
68ccece
More cleanup
jihoonson Aug 26, 2017
29a7dfe
Remove unused config
jihoonson Aug 29, 2017
48dc2cd
Fix some typos and bugs
jihoonson Sep 2, 2017
92675a9
Refactor Groupers.mergeIterators()
jihoonson Sep 2, 2017
a888db0
Add comments for combining tree
jihoonson Sep 2, 2017
7040ffb
Refactor buildCombineTree
jihoonson Sep 2, 2017
5c4b846
Refactor iterator
jihoonson Sep 2, 2017
f31f6d2
Add ParallelCombiner
jihoonson Sep 2, 2017
3cdce75
Add ParallelCombinerTest
jihoonson Sep 2, 2017
2a619b9
Handle InterruptedException
jihoonson Sep 2, 2017
3f14db4
use AbstractPrioritizedCallable
jihoonson Sep 2, 2017
dee9633
Address comments
jihoonson Sep 7, 2017
c0eecc0
Merge branch 'master' of https://github.com/druid-io/druid into strea…
jihoonson Sep 8, 2017
0f5c3a8
[maven-release-plugin] prepare release druid-0.11.0-sg
jihoonson Sep 8, 2017
5c6b31e
[maven-release-plugin] prepare for next development iteration
jihoonson Sep 8, 2017
9258453
Address comments
jihoonson Sep 9, 2017
de73645
Revert "[maven-release-plugin] prepare for next development iteration"
jihoonson Sep 9, 2017
5a87879
Revert "[maven-release-plugin] prepare release druid-0.11.0-sg"
jihoonson Sep 9, 2017
e0699c2
Fix build failure
jihoonson Sep 9, 2017
324201c
Change list to array
jihoonson Sep 10, 2017
b748da5
rename sortableIds
jihoonson Sep 10, 2017
91ed8a2
Address comments
jihoonson Sep 12, 2017
8585062
Merge branch 'master' of https://github.com/druid-io/druid into strea…
jihoonson Sep 12, 2017
a9a43b8
change to foreach loop
jihoonson Sep 12, 2017
e7144bb
Fix comment
jihoonson Sep 13, 2017
3343392
Revert keyEquals()
jihoonson Sep 13, 2017
4628c5e
Remove loop
jihoonson Sep 13, 2017
4369c3b
Address comments
jihoonson Sep 13, 2017
0beacc3
Fix build fail
jihoonson Sep 14, 2017
08d1ed6
Address comments
jihoonson Sep 19, 2017
ac5a024
Remove unused imports
jihoonson Sep 19, 2017
02264a1
Merge branch 'master' of https://github.com/druid-io/druid into strea…
jihoonson Sep 19, 2017
f168b74
Fix method name
jihoonson Sep 19, 2017
be123c1
Split intermediate and leaf combine degrees
jihoonson Sep 20, 2017
919ecbd
Add comments to StreamingMergeSortedGrouper
jihoonson Sep 21, 2017
aa1249a
Add more comments and fix overflow
jihoonson Sep 21, 2017
7a67475
Address comments
jihoonson Oct 7, 2017
766cbce
ConcurrentGrouperTest cleanup
jihoonson Oct 7, 2017
b63b37e
Merge branch 'master' of https://github.com/druid-io/druid into strea…
jihoonson Oct 7, 2017
39cf1f2
add thread number configuration for parallel combining
jihoonson Oct 12, 2017
0723960
improve doc
jihoonson Oct 13, 2017
65078c3
Merge branch 'master' of https://github.com/druid-io/druid into strea…
jihoonson Oct 14, 2017
78df53d
address comments
jihoonson Oct 14, 2017
834001a
Merge branch 'master' of https://github.com/druid-io/druid into strea…
jihoonson Oct 14, 2017
101391b
fix build
jihoonson Oct 14, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions common/src/main/java/io/druid/common/utils/IntArrayUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.common.utils;

public class IntArrayUtils
{
/**
* Inverses the values of the given array with their indexes.
* For example, the result for [2, 0, 1] is [1, 2, 0] because
*
* a[0]: 2 => a[2]: 0
* a[1]: 0 => a[0]: 1
* a[2]: 1 => a[1]: 2
*/
public static void inverse(int[] a)
{
for (int i = 0; i < a.length; i++) {
if (a[i] >= 0) {
inverseLoop(a, i);
}
}

for (int i = 0; i < a.length; i++) {
a[i] = ~a[i];
}
}

private static void inverseLoop(int[] a, int startValue)
{
final int startIndex = a[startValue];

int nextIndex = startIndex;
int nextValue = startValue;

do {
final int curIndex = nextIndex;
final int curValue = nextValue;

nextValue = curIndex;
nextIndex = a[curIndex];

a[curIndex] = ~curValue;
} while (nextIndex != startIndex);
}

private IntArrayUtils() {}
}
54 changes: 54 additions & 0 deletions common/src/test/java/io/druid/common/utils/IntArrayUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.common.utils;

import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class IntArrayUtilsTest
{
@Test
public void testInverse()
{
final int numVals = 10000;
final Random random = new Random(System.currentTimeMillis());
final int[] inverted = new int[numVals];
final int[] original = new int[numVals];

final List<Integer> ints = IntStream.range(0, numVals).boxed().collect(Collectors.toList());
Collections.shuffle(ints, random);

for (int i = 0; i < numVals; i++) {
inverted[i] = ints.get(i);
original[i] = inverted[i];
}
IntArrayUtils.inverse(inverted);

for (int i = 0; i < numVals; i++) {
Assert.assertEquals(i, inverted[original[i]]);
}
}
}
1 change: 1 addition & 0 deletions docs/content/operations/performance-faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ A useful formula for estimating direct memory usage follows:
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`

The `+1` is a fuzzy parameter meant to account for the decompression and dictionary merging buffers and may need to be adjusted based on the characteristics of the data being ingested/queried.
Operators can ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command line.

## What is the intermediate computation buffer?
The intermediate computation buffer specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed. The default size is 1073741824 bytes (1GB).
Expand Down
131 changes: 94 additions & 37 deletions docs/content/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ An example groupBy query object is shown below:
],
"intervals": [ "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000" ],
"having": {
"type": "greaterThan",
"aggregation": "total_usage",
"value": 100
"type": "greaterThan",
"aggregation": "total_usage",
"value": 100
}
}
```
Expand Down Expand Up @@ -180,14 +180,39 @@ disk space.

With groupBy v2, cluster operators should make sure that the off-heap hash tables and on-heap merging dictionaries
will not exceed available memory for the maximum possible concurrent query load (given by
druid.processing.numMergeBuffers).
druid.processing.numMergeBuffers). See [How much direct memory does Druid use?](../operations/performance-faq.html) for more details.

When using groupBy v1, all aggregation is done on-heap, and resource limits are done through the parameter
druid.query.groupBy.maxResults. This is a cap on the maximum number of results in a result set. Queries that exceed
this limit will fail with a "Resource limit exceeded" error indicating they exceeded their row limit. Cluster
operators should make sure that the on-heap aggregations will not exceed available JVM heap space for the expected
concurrent query load.

#### Performance tuning for groupBy v2

##### Limit pushdown optimization

Druid pushes down the `limit` spec in groupBy queries to the segments on historicals wherever possible to early prune unnecessary intermediate results and minimize the amount of data transferred to brokers. By default, this technique is applied only when all fields in the `orderBy` spec is a subset of the grouping keys. This is because the `limitPushDown` doesn't guarantee the exact results if the `orderBy` spec includes any fields that are not in the grouping keys. However, you can enable this technique even in such cases if you can sacrifice some accuracy for fast query processing like in topN queries. See `forceLimitPushDown` in [advanced groupBy v2 configurations](#groupby-v2-configurations).


##### Optimizing hash table

The groupBy v2 engine uses an open addressing hash table for aggregation. The hash table is initalized with a given initial bucket number and gradually grows on buffer full. On hash collisions, the linear probing technique is used.

The default number of initial buckets is 1024 and the default max load factor of the hash table is 0.7. If you can see too many collisions in the hash table, you can adjust these numbers. See `bufferGrouperInitialBuckets` and `bufferGrouperMaxLoadFactor` in [Advanced groupBy v2 configurations](#groupby-v2-configurations).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How a user could "see too many collisions"?

Copy link
Contributor Author

@jihoonson jihoonson Oct 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is not easy for users. I think currently the only possible way is method-level profiling. I think we need to provide some metrics for hash table collisions and growths.



##### Parallel combine

Once a historical finishes aggregation using the hash table, it sorts aggregates and merge them before sending to the broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads (configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting and merging aggregates which is an http thread to send data to brokers.

This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take longer time than timeseries or topN queries, they should release processing threads as soon as possible.

However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck of heavy groupBy queries is merging sorted aggregates. In such cases, you can use processing threads for it as well. This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled only when data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)).

Once parallel combine is enabled, the groupBy v2 engine can create a combining tree for merging sorted aggregates. Each intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the degree of intermeidate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations).


#### Alternatives

There are some situations where other query types may be a better choice than groupBy.
Expand All @@ -208,55 +233,87 @@ indexing mechanism, and runs the outer query on these materialized results. "v2"
inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both
strategy perform the outer query on the broker in a single-threaded fashion.

#### Server configuration
#### Configurations

This section describes the configurations for groupBy queries. You can set system-wide configurations by adding them to runtime properties or query-specific configurations by adding them to query contexts. All runtime properties are prefixed by `druid.query.groupBy`.

#### Commonly tuned configurations

When using the "v2" strategy, the following runtime properties apply:
##### Configurations for groupBy v2

Supported runtime properties:

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2|
|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0|
|`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false|

This may require allocating more direct memory. The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command
line.
Supported query contexts:

|Key|Description|
|---|-----------|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|


#### Advanced configurations

##### Common configuragions for all groupBy strategies

When using the "v1" strategy, the following runtime properties apply:
Supported runtime properties:

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows for the per-segment grouping engine. This is a tuning parameter that does not impose a hard limit; rather, it potentially shifts merging work from the per-segment engine to the overall merging index. Queries that exceed this limit will not fail.|50000|
|`druid.query.groupBy.maxResults`|Maximum number of results. Queries that exceed this limit will fail.|500000|
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false|

#### Query context

When using the "v2" strategy, the following query context parameters apply:
Supported query contexts:

|Property|Description|
|--------|-----------|
|Key|Description|
|---|-----------|
|`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.|
|`groupByIsSingleThreaded`|Overrides the value of `druid.query.groupBy.singleThreaded` for this query.|
|`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.|
|`bufferGrouperMaxLoadFactor`|Overrides the value of `druid.query.groupBy.bufferGrouperMaxLoadFactor` for this query.|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|
|`forcePushDownLimit`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|
|`forceHashAggregation`|Force to use hash-based aggregation.|

When using the "v1" strategy, the following query context parameters apply:

|Property|Description|
|--------|-----------|
|`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.|
|`groupByIsSingleThreaded`|Overrides the value of `druid.query.groupBy.singleThreaded` for this query.|
|`maxIntermediateRows`|Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for this query.|
|`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|
|`useOffheap`|Set to true to store aggregations off-heap when merging results.|
##### GroupBy v2 configurations

Supported runtime properties:

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default (1024).|0|
|`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default (0.7).|0|
|`druid.query.groupBy.forceHashAggregation`|Force to use hash-based aggregation.|false|
|`druid.query.groupBy.intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need less threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8|
|`druid.query.groupBy.numParallelCombineThreads`|Hint for the number of parallel combining threads. This should be larger than 1 to turn on the parallel combining feature. The actual number of threads used for parallel combining is min(`druid.query.groupBy.numParallelCombineThreads`, `druid.processing.numThreads`).|1 (disabled)|

Supported query contexts:

|Key|Description|Default|
|---|-----------|-------|
|`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.|None|
|`bufferGrouperMaxLoadFactor`|Overrides the value of `druid.query.groupBy.bufferGrouperMaxLoadFactor` for this query.|None|
|`forceHashAggregation`|Overrides the value of `druid.query.groupBy.forceHashAggregation`|None|
|`intermediateCombineDegree`|Overrides the value of `druid.query.groupBy.intermediateCombineDegree`|None|
|`numParallelCombineThreads`|Overrides the value of `druid.query.groupBy.numParallelCombineThreads`|None|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|false|
|`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false|


##### GroupBy v1 configurations

Supported runtime properties:

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows for the per-segment grouping engine. This is a tuning parameter that does not impose a hard limit; rather, it potentially shifts merging work from the per-segment engine to the overall merging index. Queries that exceed this limit will not fail.|50000|
|`druid.query.groupBy.maxResults`|Maximum number of results. Queries that exceed this limit will fail.|500000|

Supported query contexts:

|Key|Description|Default|
|---|-----------|-------|
|`maxIntermediateRows`|Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for this query.|None|
|`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|None|
|`useOffheap`|Set to true to store aggregations off-heap when merging results.|false|

Loading