Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort-merge join and hash shuffles for MSQ. #13506

Merged
merged 29 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
69288c7
Sort-merge join and hash shuffles for MSQ.
gianm Dec 6, 2022
7b6c47a
Work around compiler problem.
gianm Dec 6, 2022
52ea257
Updates from static analysis.
gianm Dec 6, 2022
308b478
Fix @param tag.
gianm Dec 7, 2022
32b879f
Fix declared exception.
gianm Dec 7, 2022
d2d56f3
Fix spelling.
gianm Dec 7, 2022
4514d23
Minor adjustments.
gianm Dec 7, 2022
16a72b8
Merge branch 'master' into msq-sort-merge-join
gianm Dec 24, 2022
693be01
wip
gianm Dec 25, 2022
d2cda5d
Merge branch 'master' into msq-sort-merge-join
gianm Feb 19, 2023
2536198
Merge fixups
gianm Feb 19, 2023
2ddb89a
fixes
gianm Feb 22, 2023
306d403
Fix CalciteSelectQueryMSQTest
gianm Feb 22, 2023
fce86f9
Merge branch 'master' into msq-sort-merge-join
gianm Feb 22, 2023
c5b7422
Empty keys are sortable.
gianm Feb 22, 2023
305b957
Address comments from code review. Rename mux -> mix.
gianm Feb 22, 2023
3d43e84
Restore inspection config.
gianm Feb 22, 2023
da92c3f
Restore original doc.
gianm Feb 22, 2023
a416564
Reorder imports.
gianm Feb 23, 2023
f658454
Merge branch 'master' into msq-sort-merge-join
gianm Feb 23, 2023
0c0e99d
Adjustments
gianm Feb 23, 2023
86a6fe3
Fix.
gianm Feb 23, 2023
5edd5ad
Merge branch 'master' into msq-sort-merge-join
gianm Feb 27, 2023
288f30a
Merge branch 'master' into msq-sort-merge-join
gianm Feb 27, 2023
87cd12c
Fix imports.
gianm Feb 28, 2023
0361ba3
Merge branch 'master' into msq-sort-merge-join
gianm Mar 7, 2023
5b1c4b3
Adjustments from review.
gianm Mar 7, 2023
74fbb42
Update header.
gianm Mar 7, 2023
0299ca1
Adjust docs.
gianm Mar 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ The following table lists the context parameters for the MSQ task engine:
| `maxNumTasks` | SELECT, INSERT, REPLACE<br /><br />The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.<br /><br />May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority.| 2 |
| `taskAssignment` | SELECT, INSERT, REPLACE<br /><br />Determines how many tasks to use. Possible values include: <ul><li>`max`: Uses as many tasks as possible, up to `maxNumTasks`.</li><li>`auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 10 GiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.</li></ul> | `max` |
| `finalizeAggregations` | SELECT, INSERT, REPLACE<br /><br />Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true |
| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of thing would normally be delivered via a hint in the SQL, perhaps it's not too hard to deliver it that way?

Copy link
Contributor Author

@gianm gianm Dec 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, hints are not available in the version of Calcite version we use. Newer versions have this: https://calcite.apache.org/docs/reference.html#sql-hints

@abhishekagarwal87 found some reasons we couldn't do it right now, as referenced by this comment: #13153 (comment). @abhishekagarwal87 would you mind writing up your notes as to what blocks an upgrade, and making an issue about that, titled something like Upgrade Calcite past <whatever version introduced the blocking problem>? That way, we have an issue we can refer to and use to discuss possible ways to fix the blockers.

Once we sort that out, I'd like to deprecate the context parameter and move things to use hints (and eventually statistics as well) instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm - Here it is. #13532.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making some progress here, described on #13532. It will require another Calcite release, so I think it's best if we merge this sort-merge join PR first, then do the Calcite upgrade and add hints.

| `rowsInMemory` | INSERT or REPLACE<br /><br />Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
| `segmentSortOrder` | INSERT or REPLACE<br /><br />Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.<br /><br />You provide the column list as comma-separated values or as a JSON array in string form. If your query includes `__time`, then this list must begin with `__time`. For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list |
| `maxParseExceptions`| SELECT, INSERT, REPLACE<br /><br />Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1.| 0 |
Expand All @@ -604,6 +605,91 @@ The following table lists the context parameters for the MSQ task engine:
| `intermediateSuperSorterStorageMaxLocalBytes` | SELECT, INSERT, REPLACE<br /><br /> Whether to enable a byte limit on local storage for sorting's intermediate data. If that limit is crossed, the task fails with `ResourceLimitExceededException`.| `9223372036854775807` |
| `maxInputBytesPerWorker` | Should be used in conjunction with taskAssignment `auto` mode. When dividing the input of a stage among the workers, this parameter determines the maximum size in bytes that are given to a single worker before the next worker is chosen. This parameter is only used as a guideline during input slicing, and does not guarantee that a the input cannot be larger. For example, we have 3 files. 3, 7, 12 GB each. then we would end up using 2 worker: worker 1 -> 3, 7 and worker 2 -> 12. This value is used for all stages in a query. | `10737418240` |

## Joins

Joins in multi-stage queries use one of two algorithms, based on the value of `sqlJoinAlgorithm`. It is not possible to
mix different join algorithms for different joins that appear in the same query.

### Broadcast

Set `sqlJoinAlgorithm` to `broadcast`.

The default join algorithm for multi-stage queries is a broadcast hash join, which is similar to how
[joins are executed with native queries](../querying/query-execution.md#join). First, any adjacent joins are flattened
into a structure with a "base" input (the bottom-leftmost one) and other leaf inputs (the rest). Next, any subqueries
that are inputs the join (either base or other leafs) are planned into independent stages. Then, the non-base leaf
inputs are all connected as broadcast inputs to the "base" stage.

Together, all of these non-base leaf inputs must not exceed the [limit on broadcast table footprint](#limits). There
is no limit on the size of the base (leftmost) input.

Only LEFT JOIN, INNER JOIN, and CROSS JOIN are supported with with `broadcast`.

Join conditions, if present, must be equalities. It is not necessary to include a join condition; for example,
`CROSS JOIN` and comma join do not require join conditions.

As an example, the following statement has a single join chain where `orders` is the base input, and `products` and
`customers` are non-base leaf inputs. The query will first read `products` and `customers`, then broadcast both to
the stage that reads `orders`. That stage loads the broadcast inputs (`products` and `customers`) in memory, and walks
through `orders` row by row. The results are then aggregated and written to the table `orders_enriched`. The broadcast
inputs (`products` and `customers`) must fall under the limit on broadcast table footprint, but the base `orders` input
can be unlimited in size.

```
REPLACE INTO orders_enriched
OVERWRITE ALL
SELECT
orders.__time,
products.name AS product_name,
customers.name AS customer_name,
SUM(orders.amount) AS amount
FROM orders
LEFT JOIN products ON orders.product_id = products.id
LEFT JOIN customers ON orders.customer_id = customers.id
GROUP BY 1, 2
PARTITIONED BY HOUR
CLUSTERED BY product_name
```

### Sort-merge

Set `sqlJoinAlgorithm` to `sortMerge`.

Multi-stage queries can use a sort-merge join algorithm. With this algorithm, each pairwise join is planned into its own
stage with two inputs. The two inputs are partitioned and sorted using a hash partitioning on the same key. This
approach is generally less performant, but more scalable, than `broadcast`. There are various scenarios where broadcast
join would return a [`BroadcastTablesTooLarge`](#errors) error, but a sort-merge join would succeed.

There is no limit on the overall size of either input, so sort-merge is a good choice for performing a join of two large
inputs, or for performing a self-join of a large input with itself.

There is a limit on the amount of data associated with each individual key. If _both_ sides of the join exceed this
limit, the query returns a [`TooManyRowsWithSameKey`](#errors) error. If only one side exceeds the limit, the query
does not return this error.

Join conditions, if present, must be equalities. It is not necessary to include a join condition; for example,
`CROSS JOIN` and comma join do not require join conditions.

All join types are supported with `sortMerge`: LEFT, RIGHT, INNER, FULL, and CROSS.

As an example, the following statement runs using a single sort-merge join stage that receives `eventstream`
(partitioned on `user_id`) and `users` (partitioned on `id`) as inputs. There is no limit on the size of either input.

```
REPLACE INTO eventstream_enriched
OVERWRITE ALL
SELECT
eventstream.__time,
eventstream.user_id,
eventstream.event_type,
eventstream.event_details,
users.signup_date AS user_signup_date
FROM eventstream
LEFT JOIN users ON eventstream.user_id = users.id
PARTITIONED BY HOUR
CLUSTERED BY user
```

## Sketch Merging Mode
This section details the advantages and performance of various Cluster By Statistics Merge Modes.

Expand Down Expand Up @@ -656,6 +742,7 @@ The following table lists query limits:
| Number of cluster by columns that can appear in a stage | 1,500 | [`TooManyClusteredByColumns`](#error_TooManyClusteredByColumns) |
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | [`TooManyWorkers`](#error_TooManyWorkers) |
| Maximum memory occupied by broadcasted tables. | 30% of each [processor memory bundle](concepts.md#memory-usage). | [`BroadcastTablesTooLarge`](#error_BroadcastTablesTooLarge) |
| Maximum memory occupied by buffered data during sort-merge join. Only relevant when `sqlJoinAlgorithm` is `sortMerge`. | 10 MB | `TooManyRowsWithSameKey` |
| Maximum relaunch attempts per worker. Initial run is not a relaunch. The worker will be spawned 1 + `workerRelaunchLimit` times before the job fails. | 2 | `TooManyAttemptsForWorker` |
| Maximum relaunch attempts for a job across all workers. | 100 | `TooManyAttemptsForJob` |
<a name="errors"></a>
Expand Down Expand Up @@ -688,6 +775,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_TooManyInputFiles">`TooManyInputFiles`</a> | Exceeded the maximum number of input files or segments per worker (10,000 files or segments).<br /><br />If you encounter this limit, consider adding more workers, or breaking up your query into smaller queries that process fewer files or segments per query. | `numInputFiles`: The total number of input files/segments for the stage.<br /><br />`maxInputFiles`: The maximum number of input files/segments per worker per stage.<br /><br />`minNumWorker`: The minimum number of workers required for a successful run. |
| <a name="error_TooManyPartitions">`TooManyPartitions`</a> | Exceeded the maximum number of partitions for a stage (25,000 partitions).<br /><br />This can occur with INSERT or REPLACE statements that generate large numbers of segments, since each segment is associated with a partition. If you encounter this limit, consider breaking up your INSERT or REPLACE statement into smaller statements that process less data per statement. | `maxPartitions`: The limit on partitions which was exceeded |
| <a name="error_TooManyClusteredByColumns">`TooManyClusteredByColumns`</a> | Exceeded the maximum number of clustering columns for a stage (1,500 columns).<br /><br />This can occur with `CLUSTERED BY`, `ORDER BY`, or `GROUP BY` with a large number of columns. | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded.`stage`: The stage number exceeding the limit<br /><br /> |
| <a name="error_TooManyRowsWithSameKey">`TooManyRowsWithSameKey`</a> | The number of rows for a given key exceeded the maximum number of buffered bytes on both sides of a join. See the [Limits](#limits) table for the specific limit. Only occurs when `sqlJoinAlgorithm` is `sortMerge`. | `key`: The key that had a large number of rows.<br /><br />`numBytes`: Number of bytes buffered, which may include other keys.<br /><br />`maxBytes`: Maximum number of bytes buffered. |
| <a name="error_TooManyColumns">`TooManyColumns`</a> | Exceeded the maximum number of columns for a stage (2,000 columns). | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded. |
| <a name="error_TooManyWarnings">`TooManyWarnings`</a> | Exceeded the maximum allowed number of warnings of a particular type. | `rootErrorCode`: The error code corresponding to the exception that exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of warnings that are allowed for the corresponding `rootErrorCode`. |
| <a name="error_TooManyWorkers">`TooManyWorkers`</a> | Exceeded the maximum number of simultaneously-running workers. See the [Limits](#limits) table for more details. | `workers`: The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard or soft limit on workers that was exceeded. If this is lower than the hard limit (1,000 workers), then you can increase the limit by adding more memory to each task. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.ClusterByPartition;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.key.RowKeyReader;
import org.apache.druid.frame.key.SortColumn;
import org.apache.druid.frame.processor.FrameProcessorExecutor;
import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.frame.util.DurableStorageUtils;
Expand Down Expand Up @@ -135,12 +136,12 @@
import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.input.table.TableInputSpecSlicer;
import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.kernel.TargetSizeShuffleSpec;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernel;
import org.apache.druid.msq.kernel.controller.ControllerStagePhase;
Expand Down Expand Up @@ -701,8 +702,8 @@ public void updatePartialKeyStatisticsInformation(
final StageDefinition stageDef = queryKernel.getStageDefinition(stageId);
final ObjectMapper mapper = MSQTasks.decorateObjectMapperForKeyCollectorSnapshot(
context.jsonMapper(),
stageDef.getShuffleSpec().get().getClusterBy(),
stageDef.getShuffleSpec().get().doesAggregateByClusterKey()
stageDef.getShuffleSpec().clusterBy(),
stageDef.getShuffleSpec().doesAggregate()
Comment on lines +702 to +703
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You hate the word "get"/"is"? "getClusterBy()" "isDoesAggregate()"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love them for holder objects that don't "do" anything. Just personal preference I guess.

);

final PartialKeyStatisticsInformation partialKeyStatisticsInformation;
Expand Down Expand Up @@ -1520,7 +1521,7 @@ private static QueryDefinition makeQueryDefinition(

if (MSQControllerTask.isIngestion(querySpec)) {
shuffleSpecFactory = (clusterBy, aggregate) ->
new TargetSizeShuffleSpec(
new GlobalSortTargetSizeShuffleSpec(
clusterBy,
tuningConfig.getRowsPerSegment(),
aggregate
Expand Down Expand Up @@ -1746,7 +1747,7 @@ private static List<String> computeShardColumns(
final ColumnMappings columnMappings
)
{
final List<SortColumn> clusterByColumns = clusterBy.getColumns();
final List<KeyColumn> clusterByColumns = clusterBy.getColumns();
final List<String> shardColumns = new ArrayList<>();
final boolean boosted = isClusterByBoosted(clusterBy);
final int numShardColumns = clusterByColumns.size() - clusterBy.getBucketByCount() - (boosted ? 1 : 0);
Expand All @@ -1756,11 +1757,11 @@ private static List<String> computeShardColumns(
}

for (int i = clusterBy.getBucketByCount(); i < clusterBy.getBucketByCount() + numShardColumns; i++) {
final SortColumn column = clusterByColumns.get(i);
final KeyColumn column = clusterByColumns.get(i);
final List<String> outputColumns = columnMappings.getOutputColumnsForQueryColumn(column.columnName());

// DimensionRangeShardSpec only handles ascending order.
if (column.descending()) {
if (column.order() != KeyOrder.ASCENDING) {
return Collections.emptyList();
}

Expand Down Expand Up @@ -1842,8 +1843,8 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
// Note: this doesn't work when CLUSTERED BY specifies an expression that is not being selected.
// Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows
// within an individual segment.
for (final SortColumn clusterByColumn : queryClusterBy.getColumns()) {
if (clusterByColumn.descending()) {
for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) {
if (clusterByColumn.order() == KeyOrder.DESCENDING) {
throw new MSQException(new InsertCannotOrderByDescendingFault(clusterByColumn.columnName()));
}

Expand Down Expand Up @@ -2418,7 +2419,7 @@ private void startStages() throws IOException, InterruptedException
segmentsToGenerate = generateSegmentIdsWithShardSpecs(
(DataSourceMSQDestination) task.getQuerySpec().getDestination(),
queryKernel.getStageDefinition(shuffleStageId).getSignature(),
queryKernel.getStageDefinition(shuffleStageId).getShuffleSpec().get().getClusterBy(),
queryKernel.getStageDefinition(shuffleStageId).getClusterBy(),
partitionBoundaries,
mayHaveMultiValuedClusterByFields
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public class Limits
*/
public static final int MAX_KERNEL_MANIPULATION_QUEUE_SIZE = 100_000;

/**
* Maximum number of bytes buffered for each side of a
* {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor}, not counting the most recent frame read.
*/
public static final int MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN = 10_000_000;

/**
* Maximum relaunches across all workers.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.kernel.QueryDefinition;
Expand Down Expand Up @@ -73,4 +74,9 @@ public interface WorkerContext
DruidNode selfNode();

Bouncer processorBouncer();

default File tempDir(int stageNumber, String id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit brigade! I think we tend to use tmp in other parts of the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was modeling it off File tempDir() that is already here without args. I can change both? This one doesn't matter much to me either way. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like consistency. If it's always tmp then it's much easier to say that next time it should be tmp as well.

{
return new File(StringUtils.format("%s/stage_%02d/%s", tempDir(), stageNumber, id));
}
}
Loading