-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sort-merge join and hash shuffles for MSQ. #13506
Changes from 6 commits
69288c7
7b6c47a
52ea257
308b478
32b879f
d2d56f3
4514d23
16a72b8
693be01
d2cda5d
2536198
2ddb89a
306d403
fce86f9
c5b7422
305b957
3d43e84
da92c3f
a416564
f658454
0c0e99d
86a6fe3
5edd5ad
288f30a
87cd12c
0361ba3
5b1c4b3
74fbb42
0299ca1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,9 +44,10 @@ | |
import org.apache.druid.frame.key.ClusterBy; | ||
import org.apache.druid.frame.key.ClusterByPartition; | ||
import org.apache.druid.frame.key.ClusterByPartitions; | ||
import org.apache.druid.frame.key.KeyColumn; | ||
import org.apache.druid.frame.key.KeyOrder; | ||
import org.apache.druid.frame.key.RowKey; | ||
import org.apache.druid.frame.key.RowKeyReader; | ||
import org.apache.druid.frame.key.SortColumn; | ||
import org.apache.druid.frame.processor.FrameProcessorExecutor; | ||
import org.apache.druid.frame.processor.FrameProcessors; | ||
import org.apache.druid.indexer.TaskState; | ||
|
@@ -130,12 +131,12 @@ | |
import org.apache.druid.msq.input.stage.StageInputSpecSlicer; | ||
import org.apache.druid.msq.input.table.TableInputSpec; | ||
import org.apache.druid.msq.input.table.TableInputSpecSlicer; | ||
import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec; | ||
import org.apache.druid.msq.kernel.QueryDefinition; | ||
import org.apache.druid.msq.kernel.QueryDefinitionBuilder; | ||
import org.apache.druid.msq.kernel.StageDefinition; | ||
import org.apache.druid.msq.kernel.StageId; | ||
import org.apache.druid.msq.kernel.StagePartition; | ||
import org.apache.druid.msq.kernel.TargetSizeShuffleSpec; | ||
import org.apache.druid.msq.kernel.WorkOrder; | ||
import org.apache.druid.msq.kernel.controller.ControllerQueryKernel; | ||
import org.apache.druid.msq.kernel.controller.ControllerStagePhase; | ||
|
@@ -595,8 +596,8 @@ public void updatePartialKeyStatisticsInformation(int stageNumber, int workerNum | |
final StageDefinition stageDef = queryKernel.getStageDefinition(stageId); | ||
final ObjectMapper mapper = MSQTasks.decorateObjectMapperForKeyCollectorSnapshot( | ||
context.jsonMapper(), | ||
stageDef.getShuffleSpec().get().getClusterBy(), | ||
stageDef.getShuffleSpec().get().doesAggregateByClusterKey() | ||
stageDef.getShuffleSpec().clusterBy(), | ||
stageDef.getShuffleSpec().doesAggregate() | ||
Comment on lines
+702
to
+703
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You hate the word "get"/"is"? "getClusterBy()" "isDoesAggregate()"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't love them for holder objects that don't "do" anything. Just personal preference I guess. |
||
); | ||
|
||
final PartialKeyStatisticsInformation partialKeyStatisticsInformation; | ||
|
@@ -1361,7 +1362,7 @@ private static QueryDefinition makeQueryDefinition( | |
|
||
if (MSQControllerTask.isIngestion(querySpec)) { | ||
shuffleSpecFactory = (clusterBy, aggregate) -> | ||
new TargetSizeShuffleSpec( | ||
new GlobalSortTargetSizeShuffleSpec( | ||
clusterBy, | ||
tuningConfig.getRowsPerSegment(), | ||
aggregate | ||
|
@@ -1583,7 +1584,7 @@ private static List<String> computeShardColumns( | |
final ColumnMappings columnMappings | ||
) | ||
{ | ||
final List<SortColumn> clusterByColumns = clusterBy.getColumns(); | ||
final List<KeyColumn> clusterByColumns = clusterBy.getColumns(); | ||
final List<String> shardColumns = new ArrayList<>(); | ||
final boolean boosted = isClusterByBoosted(clusterBy); | ||
final int numShardColumns = clusterByColumns.size() - clusterBy.getBucketByCount() - (boosted ? 1 : 0); | ||
|
@@ -1593,11 +1594,11 @@ private static List<String> computeShardColumns( | |
} | ||
|
||
for (int i = clusterBy.getBucketByCount(); i < clusterBy.getBucketByCount() + numShardColumns; i++) { | ||
final SortColumn column = clusterByColumns.get(i); | ||
final KeyColumn column = clusterByColumns.get(i); | ||
final List<String> outputColumns = columnMappings.getOutputColumnsForQueryColumn(column.columnName()); | ||
|
||
// DimensionRangeShardSpec only handles ascending order. | ||
if (column.descending()) { | ||
if (column.order() != KeyOrder.ASCENDING) { | ||
return Collections.emptyList(); | ||
} | ||
|
||
|
@@ -1679,8 +1680,8 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio | |
// Note: this doesn't work when CLUSTERED BY specifies an expression that is not being selected. | ||
// Such fields in CLUSTERED BY still control partitioning as expected, but do not affect sort order of rows | ||
// within an individual segment. | ||
for (final SortColumn clusterByColumn : queryClusterBy.getColumns()) { | ||
if (clusterByColumn.descending()) { | ||
for (final KeyColumn clusterByColumn : queryClusterBy.getColumns()) { | ||
if (clusterByColumn.order() == KeyOrder.DESCENDING) { | ||
throw new MSQException(new InsertCannotOrderByDescendingFault(clusterByColumn.columnName())); | ||
} | ||
|
||
|
@@ -2123,7 +2124,7 @@ private void startStages() throws IOException, InterruptedException | |
segmentsToGenerate = generateSegmentIdsWithShardSpecs( | ||
(DataSourceMSQDestination) task.getQuerySpec().getDestination(), | ||
queryKernel.getStageDefinition(shuffleStageId).getSignature(), | ||
queryKernel.getStageDefinition(shuffleStageId).getShuffleSpec().get().getClusterBy(), | ||
queryKernel.getStageDefinition(shuffleStageId).getClusterBy(), | ||
partitionBoundaries, | ||
mayHaveMultiValuedClusterByFields | ||
); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sort of thing would normally be delivered via a hint in the SQL, perhaps it's not too hard to deliver it that way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, hints are not available in the version of Calcite version we use. Newer versions have this: https://calcite.apache.org/docs/reference.html#sql-hints
@abhishekagarwal87 found some reasons we couldn't do it right now, as referenced by this comment: #13153 (comment). @abhishekagarwal87 would you mind writing up your notes as to what blocks an upgrade, and making an issue about that, titled something like
Upgrade Calcite past <whatever version introduced the blocking problem>
? That way, we have an issue we can refer to and use to discuss possible ways to fix the blockers.Once we sort that out, I'd like to deprecate the context parameter and move things to use hints (and eventually statistics as well) instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm - Here it is. #13532.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making some progress here, described on #13532. It will require another Calcite release, so I think it's best if we merge this sort-merge join PR first, then do the Calcite upgrade and add hints.