Skip to content

Commit

Permalink
Sort-merge join and hash shuffles for MSQ. (apache#13506)
Browse files Browse the repository at this point in the history
* Sort-merge join and hash shuffles for MSQ.

The main changes are in the processing, multi-stage-query, and sql modules.

processing module:

1) Rename SortColumn to KeyColumn, replace boolean descending with KeyOrder.
   This makes it nicer to model hash keys, which use KeyOrder.NONE.

2) Add nullability checkers to the FieldReader interface, and an
   "isPartiallyNullKey" method to FrameComparisonWidget. The join
   processor uses this to detect null keys.

3) Add WritableFrameChannel.isClosed and OutputChannel.isReadableChannelReady
   so callers can tell which OutputChannels are ready for reading and which
   aren't.

4) Specialize FrameProcessors.makeCursor to return FrameCursor, a random-access
   implementation. The join processor uses this to rewind when it needs to
   replay a set of rows with a particular key.

5) Add MemoryAllocatorFactory, which is embedded inside FrameWriterFactory
   instead of a particular MemoryAllocator. This allows FrameWriterFactory
   to be shared in more scenarios.

multi-stage-query module:

1) ShuffleSpec: Add hash-based shuffles. New enum ShuffleKind helps callers
   figure out what kind of shuffle is happening. The change from SortColumn
   to KeyColumn allows ClusterBy to be used for both hash-based and sort-based
   shuffling.

2) WorkerImpl: Add ability to handle hash-based shuffles. Refactor the logic
   to be more readable by moving the work-order-running code to the inner
   class RunWorkOrder, and the shuffle-pipeline-building code to the inner
   class ShufflePipelineBuilder.

3) Add SortMergeJoinFrameProcessor and factory.

4) WorkerMemoryParameters: Adjust logic to reserve space for output frames
   for hash partitioning. (We need one frame per partition.)

sql module:

1) Add sqlJoinAlgorithm context parameter; can be "broadcast" or
   "sortMerge". With native, it must always be "broadcast", or it's a
   validation error. MSQ supports both. Default is "broadcast" in
   both engines.

2) Validate that MSQs do not use broadcast join with RIGHT or FULL join,
   as results are not correct for broadcast join with those types. Allow
   this in native for two reasons: legacy (the docs caution against it,
   but it's always been allowed), and the fact that it actually *does*
   generate correct results in native when the join is processed on the
   Broker. It is much less likely that MSQ will plan in such a way that
   generates correct results.

3) Remove subquery penalty in DruidJoinQueryRel when using sort-merge
   join, because subqueries are always required, so there's no reason
   to penalize them.

4) Move previously-disabled join reordering and manipulation rules to
   FANCY_JOIN_RULES, and enable them when using sort-merge join. Helps
   get to better plans where projections and filters are pushed down.

* Work around compiler problem.

* Updates from static analysis.

* Fix @param tag.

* Fix declared exception.

* Fix spelling.

* Minor adjustments.

* wip

* Merge fixups

* fixes

* Fix CalciteSelectQueryMSQTest

* Empty keys are sortable.

* Address comments from code review. Rename mux -> mix.

* Restore inspection config.

* Restore original doc.

* Reorder imports.

* Adjustments

* Fix.

* Fix imports.

* Adjustments from review.

* Update header.

* Adjust docs.
  • Loading branch information
gianm authored and 317brian committed Mar 10, 2023
1 parent 5f8590f commit f219371
Show file tree
Hide file tree
Showing 153 changed files with 6,835 additions and 1,630 deletions.
89 changes: 89 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ The following table lists the context parameters for the MSQ task engine:
| `maxNumTasks` | SELECT, INSERT, REPLACE<br /><br />The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.<br /><br />May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority.| 2 |
| `taskAssignment` | SELECT, INSERT, REPLACE<br /><br />Determines how many tasks to use. Possible values include: <ul><li>`max`: Uses as many tasks as possible, up to `maxNumTasks`.</li><li>`auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 10 GiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.</li></ul> | `max` |
| `finalizeAggregations` | SELECT, INSERT, REPLACE<br /><br />Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true |
| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` |
| `rowsInMemory` | INSERT or REPLACE<br /><br />Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
| `segmentSortOrder` | INSERT or REPLACE<br /><br />Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.<br /><br />You provide the column list as comma-separated values or as a JSON array in string form. If your query includes `__time`, then this list must begin with `__time`. For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list |
| `maxParseExceptions`| SELECT, INSERT, REPLACE<br /><br />Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1.| 0 |
Expand All @@ -604,6 +605,92 @@ The following table lists the context parameters for the MSQ task engine:
| `intermediateSuperSorterStorageMaxLocalBytes` | SELECT, INSERT, REPLACE<br /><br /> Whether to enable a byte limit on local storage for sorting's intermediate data. If that limit is crossed, the task fails with `ResourceLimitExceededException`.| `9223372036854775807` |
| `maxInputBytesPerWorker` | Should be used in conjunction with taskAssignment `auto` mode. When dividing the input of a stage among the workers, this parameter determines the maximum size in bytes that are given to a single worker before the next worker is chosen. This parameter is only used as a guideline during input slicing, and does not guarantee that a the input cannot be larger. For example, we have 3 files. 3, 7, 12 GB each. then we would end up using 2 worker: worker 1 -> 3, 7 and worker 2 -> 12. This value is used for all stages in a query. | `10737418240` |
## Joins
Joins in multi-stage queries use one of two algorithms, based on the [context parameter](#context-parameters)
`sqlJoinAlgorithm`. This context parameter applies to the entire SQL statement, so it is not possible to mix different
join algorithms in the same query.
### Broadcast
Set `sqlJoinAlgorithm` to `broadcast`.
The default join algorithm for multi-stage queries is a broadcast hash join, which is similar to how
[joins are executed with native queries](../querying/query-execution.md#join). First, any adjacent joins are flattened
into a structure with a "base" input (the bottom-leftmost one) and other leaf inputs (the rest). Next, any subqueries
that are inputs the join (either base or other leafs) are planned into independent stages. Then, the non-base leaf
inputs are all connected as broadcast inputs to the "base" stage.
Together, all of these non-base leaf inputs must not exceed the [limit on broadcast table footprint](#limits). There
is no limit on the size of the base (leftmost) input.
Only LEFT JOIN, INNER JOIN, and CROSS JOIN are supported with with `broadcast`.
Join conditions, if present, must be equalities. It is not necessary to include a join condition; for example,
`CROSS JOIN` and comma join do not require join conditions.
As an example, the following statement has a single join chain where `orders` is the base input, and `products` and
`customers` are non-base leaf inputs. The query will first read `products` and `customers`, then broadcast both to
the stage that reads `orders`. That stage loads the broadcast inputs (`products` and `customers`) in memory, and walks
through `orders` row by row. The results are then aggregated and written to the table `orders_enriched`. The broadcast
inputs (`products` and `customers`) must fall under the limit on broadcast table footprint, but the base `orders` input
can be unlimited in size.
```
REPLACE INTO orders_enriched
OVERWRITE ALL
SELECT
orders.__time,
products.name AS product_name,
customers.name AS customer_name,
SUM(orders.amount) AS amount
FROM orders
LEFT JOIN products ON orders.product_id = products.id
LEFT JOIN customers ON orders.customer_id = customers.id
GROUP BY 1, 2
PARTITIONED BY HOUR
CLUSTERED BY product_name
```
### Sort-merge
Set `sqlJoinAlgorithm` to `sortMerge`.
Multi-stage queries can use a sort-merge join algorithm. With this algorithm, each pairwise join is planned into its own
stage with two inputs. The two inputs are partitioned and sorted using a hash partitioning on the same key. This
approach is generally less performant, but more scalable, than `broadcast`. There are various scenarios where broadcast
join would return a [`BroadcastTablesTooLarge`](#errors) error, but a sort-merge join would succeed.
There is no limit on the overall size of either input, so sort-merge is a good choice for performing a join of two large
inputs, or for performing a self-join of a large input with itself.
There is a limit on the amount of data associated with each individual key. If _both_ sides of the join exceed this
limit, the query returns a [`TooManyRowsWithSameKey`](#errors) error. If only one side exceeds the limit, the query
does not return this error.
Join conditions, if present, must be equalities. It is not necessary to include a join condition; for example,
`CROSS JOIN` and comma join do not require join conditions.
All join types are supported with `sortMerge`: LEFT, RIGHT, INNER, FULL, and CROSS.
As an example, the following statement runs using a single sort-merge join stage that receives `eventstream`
(partitioned on `user_id`) and `users` (partitioned on `id`) as inputs. There is no limit on the size of either input.
```
REPLACE INTO eventstream_enriched
OVERWRITE ALL
SELECT
eventstream.__time,
eventstream.user_id,
eventstream.event_type,
eventstream.event_details,
users.signup_date AS user_signup_date
FROM eventstream
LEFT JOIN users ON eventstream.user_id = users.id
PARTITIONED BY HOUR
CLUSTERED BY user
```
## Sketch Merging Mode
This section details the advantages and performance of various Cluster By Statistics Merge Modes.
Expand Down Expand Up @@ -656,6 +743,7 @@ The following table lists query limits:
| Number of cluster by columns that can appear in a stage | 1,500 | [`TooManyClusteredByColumns`](#error_TooManyClusteredByColumns) |
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | [`TooManyWorkers`](#error_TooManyWorkers) |
| Maximum memory occupied by broadcasted tables. | 30% of each [processor memory bundle](concepts.md#memory-usage). | [`BroadcastTablesTooLarge`](#error_BroadcastTablesTooLarge) |
| Maximum memory occupied by buffered data during sort-merge join. Only relevant when `sqlJoinAlgorithm` is `sortMerge`. | 10 MB | `TooManyRowsWithSameKey` |
| Maximum relaunch attempts per worker. Initial run is not a relaunch. The worker will be spawned 1 + `workerRelaunchLimit` times before the job fails. | 2 | `TooManyAttemptsForWorker` |
| Maximum relaunch attempts for a job across all workers. | 100 | `TooManyAttemptsForJob` |
<a name="errors"></a>
Expand Down Expand Up @@ -687,6 +775,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_TooManyInputFiles">`TooManyInputFiles`</a> | Exceeded the maximum number of input files or segments per worker (10,000 files or segments).<br /><br />If you encounter this limit, consider adding more workers, or breaking up your query into smaller queries that process fewer files or segments per query. | `numInputFiles`: The total number of input files/segments for the stage.<br /><br />`maxInputFiles`: The maximum number of input files/segments per worker per stage.<br /><br />`minNumWorker`: The minimum number of workers required for a successful run. |
| <a name="error_TooManyPartitions">`TooManyPartitions`</a> | Exceeded the maximum number of partitions for a stage (25,000 partitions).<br /><br />This can occur with INSERT or REPLACE statements that generate large numbers of segments, since each segment is associated with a partition. If you encounter this limit, consider breaking up your INSERT or REPLACE statement into smaller statements that process less data per statement. | `maxPartitions`: The limit on partitions which was exceeded |
| <a name="error_TooManyClusteredByColumns">`TooManyClusteredByColumns`</a> | Exceeded the maximum number of clustering columns for a stage (1,500 columns).<br /><br />This can occur with `CLUSTERED BY`, `ORDER BY`, or `GROUP BY` with a large number of columns. | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded.`stage`: The stage number exceeding the limit<br /><br /> |
| <a name="error_TooManyRowsWithSameKey">`TooManyRowsWithSameKey`</a> | The number of rows for a given key exceeded the maximum number of buffered bytes on both sides of a join. See the [Limits](#limits) table for the specific limit. Only occurs when `sqlJoinAlgorithm` is `sortMerge`. | `key`: The key that had a large number of rows.<br /><br />`numBytes`: Number of bytes buffered, which may include other keys.<br /><br />`maxBytes`: Maximum number of bytes buffered. |
| <a name="error_TooManyColumns">`TooManyColumns`</a> | Exceeded the maximum number of columns for a stage (2,000 columns). | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded. |
| <a name="error_TooManyWarnings">`TooManyWarnings`</a> | Exceeded the maximum allowed number of warnings of a particular type. | `rootErrorCode`: The error code corresponding to the exception that exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of warnings that are allowed for the corresponding `rootErrorCode`. |
| <a name="error_TooManyWorkers">`TooManyWorkers`</a> | Exceeded the maximum number of simultaneously-running workers. See the [Limits](#limits) table for more details. | `workers`: The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard or soft limit on workers that was exceeded. If this is lower than the hard limit (1,000 workers), then you can increase the limit by adding more memory to each task. |
Expand Down
16 changes: 7 additions & 9 deletions docs/querying/datasource.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,10 @@ GROUP BY
Join datasources allow you to do a SQL-style join of two datasources. Stacking joins on top of each other allows
you to join arbitrarily many datasources.

In Druid {{DRUIDVERSION}}, joins are implemented with a broadcast hash-join algorithm. This means that all datasources
other than the leftmost "base" datasource must fit in memory. It also means that the join condition must be an equality. This
feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup), [inline](#inline), and
[query](#query) datasources.
In Druid {{DRUIDVERSION}}, joins in native queries are implemented with a broadcast hash-join algorithm. This means
that all datasources other than the leftmost "base" datasource must fit in memory. It also means that the join condition
must be an equality. This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup),
[inline](#inline), and [query](#query) datasources.

Refer to the [Query execution](query-execution.md#join) page for more details on how queries are executed when you
use join datasources.
Expand Down Expand Up @@ -362,13 +362,11 @@ Also, as a result of this, comma joins should be avoided.
Joins are an area of active development in Druid. The following features are missing today but may appear in
future versions:

- Reordering of predicates and filters (pushing up and/or pushing down) to get the most performant plan.
- Reordering of join operations to get the most performant plan.
- Preloaded dimension tables that are wider than lookups (i.e. supporting more than a single key and single value).
- RIGHT OUTER and FULL OUTER joins. Currently, they are partially implemented. Queries will run but results will not
always be correct.
- RIGHT OUTER and FULL OUTER joins in the native query engine. Currently, they are partially implemented. Queries run
but results are not always correct.
- Performance-related optimizations as mentioned in the [previous section](#join-performance).
- Join algorithms other than broadcast hash-joins.
- Join condition on a column compared to a constant value.
- Join conditions on a column containing a multi-value dimension.

### `unnest`
Expand Down
4 changes: 3 additions & 1 deletion docs/querying/joins.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ Apache Druid has two features related to joining of data:

1. [Join](datasource.md#join) operators. These are available using a [join datasource](datasource.md#join) in native
queries, or using the [JOIN operator](sql.md) in Druid SQL. Refer to the
[join datasource](datasource.md#join) documentation for information about how joins work in Druid.
[join datasource](datasource.md#join) documentation for information about how joins work in Druid native queries,
or the [multi-stage query join documentation](../multi-stage-query/reference.md#joins) for information about how joins
work in multi-stage query tasks.
2. [Query-time lookups](lookups.md), simple key-to-value mappings. These are preloaded on all servers that are involved
in queries and can be accessed with or without an explicit join operator. Refer to the [lookups](lookups.md)
documentation for more details.
Expand Down
Loading

0 comments on commit f219371

Please sign in to comment.