-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sort-merge join and hash shuffles for MSQ. #13506
Conversation
The main changes are in the processing, multi-stage-query, and sql modules. processing module: 1) Rename SortColumn to KeyColumn, replace boolean descending with KeyOrder. This makes it nicer to model hash keys, which use KeyOrder.NONE. 2) Add nullability checkers to the FieldReader interface, and an "isPartiallyNullKey" method to FrameComparisonWidget. The join processor uses this to detect null keys. 3) Add WritableFrameChannel.isClosed and OutputChannel.isReadableChannelReady so callers can tell which OutputChannels are ready for reading and which aren't. 4) Specialize FrameProcessors.makeCursor to return FrameCursor, a random-access implementation. The join processor uses this to rewind when it needs to replay a set of rows with a particular key. 5) Add MemoryAllocatorFactory, which is embedded inside FrameWriterFactory instead of a particular MemoryAllocator. This allows FrameWriterFactory to be shared in more scenarios. multi-stage-query module: 1) ShuffleSpec: Add hash-based shuffles. New enum ShuffleKind helps callers figure out what kind of shuffle is happening. The change from SortColumn to KeyColumn allows ClusterBy to be used for both hash-based and sort-based shuffling. 2) WorkerImpl: Add ability to handle hash-based shuffles. Refactor the logic to be more readable by moving the work-order-running code to the inner class RunWorkOrder, and the shuffle-pipeline-building code to the inner class ShufflePipelineBuilder. 3) Add SortMergeJoinFrameProcessor and factory. 4) WorkerMemoryParameters: Adjust logic to reserve space for output frames for hash partitioning. (We need one frame per partition.) sql module: 1) Add sqlJoinAlgorithm context parameter; can be "broadcast" or "sortMerge". With native, it must always be "broadcast", or it's a validation error. MSQ supports both. Default is "broadcast" in both engines. 2) Validate that MSQs do not use broadcast join with RIGHT or FULL join, as results are not correct for broadcast join with those types. Allow this in native for two reasons: legacy (the docs caution against it, but it's always been allowed), and the fact that it actually *does* generate correct results in native when the join is processed on the Broker. It is much less likely that MSQ will plan in such a way that generates correct results. 3) Remove subquery penalty in DruidJoinQueryRel when using sort-merge join, because subqueries are always required, so there's no reason to penalize them. 4) Move previously-disabled join reordering and manipulation rules to FANCY_JOIN_RULES, and enable them when using sort-merge join. Helps get to better plans where projections and filters are pushed down.
This pull request introduces 1 alert and fixes 1 when merging 7b6c47a into 136322d - view on LGTM.com new alerts:
fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
This pull request fixes 1 alert when merging d2d56f3 into 83261f9 - view on LGTM.com fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only just a few nit-level comments so far. I haven't gotten too far, but wanted to leave them as I will need to wander off and come back to this. The last thing I did was read through WorkerImpl, as a newbie to this side of the code, I didn't know if I should expect it to just be a whole bunch of wiring up of objects or if there should be actual logic to it. From what I saw, it seemed like it's all doing just wiring up of things. If that's expected, great. If there was supposed to be business logic inside of it that is essential for the sortMerge algorithm, it got lost on me in the overall scope of what WorkerImpl is doing.
@@ -198,13 +198,99 @@ The following table lists the context parameters for the MSQ task engine: | |||
| `maxNumTasks` | SELECT, INSERT, REPLACE<br /><br />The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.<br /><br />May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority.| 2 | | |||
| `taskAssignment` | SELECT, INSERT, REPLACE<br /><br />Determines how many tasks to use. Possible values include: <ul><li>`max`: Uses as many tasks as possible, up to `maxNumTasks`.</li><li>`auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 10 GiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.</li></ul> | `max` | | |||
| `finalizeAggregations` | SELECT, INSERT, REPLACE<br /><br />Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true | | |||
| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sort of thing would normally be delivered via a hint in the SQL, perhaps it's not too hard to deliver it that way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, hints are not available in the version of Calcite version we use. Newer versions have this: https://calcite.apache.org/docs/reference.html#sql-hints
@abhishekagarwal87 found some reasons we couldn't do it right now, as referenced by this comment: #13153 (comment). @abhishekagarwal87 would you mind writing up your notes as to what blocks an upgrade, and making an issue about that, titled something like Upgrade Calcite past <whatever version introduced the blocking problem>
? That way, we have an issue we can refer to and use to discuss possible ways to fix the blockers.
Once we sort that out, I'd like to deprecate the context parameter and move things to use hints (and eventually statistics as well) instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making some progress here, described on #13532. It will require another Calcite release, so I think it's best if we merge this sort-merge join PR first, then do the Calcite upgrade and add hints.
stageDef.getShuffleSpec().clusterBy(), | ||
stageDef.getShuffleSpec().doesAggregate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You hate the word "get"/"is"? "getClusterBy()" "isDoesAggregate()"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love them for holder objects that don't "do" anything. Just personal preference I guess.
@@ -73,4 +74,9 @@ | |||
DruidNode selfNode(); | |||
|
|||
Bouncer processorBouncer(); | |||
|
|||
default File tempDir(int stageNumber, String id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit brigade! I think we tend to use tmp
in other parts of the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was modeling it off File tempDir()
that is already here without args. I can change both? This one doesn't matter much to me either way. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like consistency. If it's always tmp
then it's much easier to say that next time it should be tmp
as well.
|
||
public enum KeyOrder | ||
{ | ||
NONE(false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of NONE
means "hashed", would it make sense to just call it HASHED
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. What I was thinking is not that "none" means "not ordered" (in ref to the enum name KeyOrder). Like, a "none" ordered thing might be used as a hash code, or it might not. I'll clarify this with a comment. If you still think it should be HASHED, let me know so I can think about it more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In your description on this PR, I took away that adding NONE here allows you to use that to mean HASHED partitioning.
Maybe I'm not understanding the point of this class, but I think I assumed it would only be instantiated if someone was effecting an ordering/clustering/partitioning of some sort? I guess, even on the comment, what does it mean for the key to be used for non-sorting purposes, if the point of creating the object is for sort-type purposes? The answer left to me is "well, then it's used for clustering/partitioning" in which case, I don't know of an algorithm other than HASH, so seems like we can just name it how we expect it to be used and then ask ourselves what to do when we come up with a new usage that doesn't fit the name HASH?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I thought about it more, so much more that it took a few months 😄
NONE actually does make sense rather than HASHED, since we can do hash partitioning along with sorting. (Meaning that ASCENDING or DESCENDING key order may still be involved in hash partitioning.)
I'll add comments discussing this.
cancellationId | ||
); | ||
|
||
inputSliceReader = new MapInputSliceReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this Map guaranteed to be used to read from all of the different types of slices? Is there a reason that it needs to be re-instantiated instead of just built once and reused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's built once per worker. It does have some special pieces that are specific to this worker, so it isn't shared across workers.
Thanks for taking a look! WorkerImpl is indeed meant to just write things up and them let them rip. The join algorithm is entirely contained within SortMergeJoinFrameProcessor. The rest of the changes (the majority of the changes) are about adding hash-based shuffles to MSQ, which previously only included range-based shuffles. |
This pull request fixes 1 alert when merging 4514d23 into b56855b - view on LGTM.com fixed alerts:
Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got through more, so a few more comments. It's mostly stream of consciousness stuff/thoughts as I read through things.
/** | ||
* Channels from {@link #inputs}. Two-element array: {@link #LEFT} and {@link #RIGHT}. | ||
*/ | ||
private final List<ReadableFrameChannel> inputChannels; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not have 2 fields: a left and a right ReadableFrameChannel instead of a List that you do .get()
on all the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, given that you have multiple lists here. Maybe what you need is a JoinChannelContainer
or something like that which has a reference to all of ReadableInput
, ReadableFrameChannel
, List<KeyColumn>
, Tracker
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As to the lists vs. using 2 fields: I was thinking that someday this code would be extended to handle joining more than two channels at once.
As to the container thingy: I decided to put most of that stuff into the Tracker, which is already a kind of container. Except for inputChannels
, because that's used as a return value for inputChannels()
. It's a bit more readable, IMO.
|
||
while (!trackers.get(LEFT).needsMoreData() | ||
&& !trackers.get(RIGHT).needsMoreData() | ||
&& !(trackers.get(LEFT).isAtEnd() && trackers.get(RIGHT).isAtEnd())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This boolean is large... There's lots of negation. It might be nice to negate a large OR instead of 3 negated-ANDs, but if there's any way to make this boolean simpler, that might be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to simplify it. It looks like this now:
while (!allTrackersAreAtEnd()
&& !trackers.get(LEFT).needsMoreData()
&& !trackers.get(RIGHT).needsMoreData()) {
// Algorithm can proceed: not all trackers are at the end of their streams, and no tracker needs more data to
// read the current cursor or move it forward.
The comment is new too. It spells out the logic as well.
} | ||
|
||
final int markCmp = compareMarks(); | ||
final boolean match = markCmp == 0 && !trackers.get(LEFT).hasPartiallyNullMark(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I read this line, I'm unclear why "partially null" matters. Maybe it'll become clearer as I read more. I went back up to the class level javadoc and skimmed it one more time, but didn't see the relevance of null called out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's to achieve proper join semantics. I added a comment:
// Two rows match if the keys compare equal _and_ neither key has a null component. (x JOIN y ON x.a = y.a does
// not match rows where "x.a" is null.)
if (match && trackerWithCompleteSetForCurrentKey < 0) { | ||
for (int i = 0; i < inputChannels.size(); i++) { | ||
final Tracker tracker = trackers.get(i); | ||
if (tracker.hasCompleteSetForMark() || (pushNextFrame(i) && tracker.hasCompleteSetForMark())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(pushNextFrame(i) && tracker.hasCompleteSetForMark())
It's unclear to me why it is sufficient to only push a single next frame and check. This sort of thing is usually in a while-loop, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adjusted some comments to hopefully clarify this.
if (currentHolder.cursor.isDone() && currentFrame + 1 < holders.size()) { | ||
currentFrame++; | ||
holders.get(currentFrame).cursor.reset(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to never be letting go of the previous FrameHolder objects? It can effectively null out the old holder, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a holders.clear()
in markCurrent()
. We can't get rid of the old holders until the mark moves on. (The purpose of holding on to them is to enable us to collect a complete set of rows for the marked key.)
I added comments to holders
clarifying this.
final FrameHolder markHolder = holders.get(markFrame); | ||
final int cmp = markHolder.comparisonWidget.compare(markRow, holder.comparisonWidget, row); | ||
|
||
assert cmp <= 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how important is this assert? If it's truly important, you should perhaps have an if statement (that branch prediction should be able to do a really good job with)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was important to me during development 🙂
I replaced these with if statements.
// In default-value mode, null country number from the left-hand table converts to zero, which matches Australia. | ||
countryCodeForNull = "AU"; | ||
countryNameForNull = "Australia"; | ||
countryNumberForNull = 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm like 80% sure this assumption would be incorrect if the long column actually came from the nested column instead of the "normal" top-level long column. That is, it's perhaps less a function of the "default-value mode" and more a function of the column implementation that happens to be getting used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Although, it's accurate as-is, because the null from the left-hand side really does convert to zero, since the left-hand side isn't using nested columns. Any suggestions on how to rewrite the comment to be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I've now read through all of the classes. Or, read them as best I can. I'm a newbie to this code, so the best I can do is a surface-level review, which is what I've done. Most of my comments are stream-of-consciousness because, I fear that I lack the context to really comment meaningfully.
I did find code that looked a bit more like business logic inside of the SortMergeJoinFrameProcessor
, so, I now know that there is business logic in there. But, I'm still left with the impression that I don't have a great handle on how that business logic works. I feel like even that class is almost all code that's just dealing with the framework (seems like it's mostly code to juggle asynchronous execution?) and it makes it hard for me, without the context of what is important and what is not, to really piece out where the business logic truly lives. I'm not sure if anything can be done about that, but that's the deepest level of commentary I can provide on this ;).
In terms of approving merge or not approving merge. I don't know how "safe" this is. It seems to touch a lot of different files and there does seem to be good test coverage, is this safe to just merge and it'll work? Is there anything extra done in the code to ensure that the old code paths are untouched in the worst case that there is a bug in here?
return memory.getByte(position) == StringFieldWriter.NULL_BYTE | ||
&& memory.getByte(position + 1) == StringFieldWriter.VALUE_TERMINATOR | ||
&& memory.getByte(position + 2) == StringFieldWriter.ROW_TERMINATOR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe either read a 3-byte array and test for equality or, if there's padding, you could read an int, mask away a byte and test equals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't necessarily going to be an extra byte of padding, and I'm not sure that reading a 3-byte array is going to be better. I'd prefer to leave this as-is.
@@ -58,6 +59,21 @@ public ClusterBy( | |||
if (bucketByCount < 0 || bucketByCount > columns.size()) { | |||
throw new IAE("Invalid bucketByCount [%d]", bucketByCount); | |||
} | |||
|
|||
// Key must be 100% sortable or 100% nonsortable. If empty, call it unsortable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is an empty key really unsortable? An empty key would effectively be equivalent to a constant value, and all data sets are sorted by a constant value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about it a bit, I changed this so an empty key is considered sortable, since I agree that makes more sense.
|
||
public enum KeyOrder | ||
{ | ||
NONE(false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In your description on this PR, I took away that adding NONE here allows you to use that to mean HASHED partitioning.
Maybe I'm not understanding the point of this class, but I think I assumed it would only be instantiated if someone was effecting an ordering/clustering/partitioning of some sort? I guess, even on the comment, what does it mean for the key to be used for non-sorting purposes, if the point of creating the object is for sort-type purposes? The answer left to me is "well, then it's used for clustering/partitioning" in which case, I don't know of an algorithm other than HASH, so seems like we can just name it how we expect it to be used and then ask ourselves what to do when we come up with a new usage that doesn't fit the name HASH?
Thanks, I still appreciate it.
The SQL planning changes are designed to be low risk when not using sort-merge joins. The new code is generally additive and the bulk of it will not execute for the default (broadcast) join algorithm. The MSQ and processing changes, other than the core sort-merge logic, are mostly related to adding hash partitioning. I tried to keep this as minimally invasive as possible, but it did have to touch various places and it could not be purely additive. To minimize the risk, I've added new tests. I will fix any problems that turn up. |
@imply-cheddar I've pushed up changes responsive to your comments. Thanks for reviewing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CodeQL found more than 10 potential problems in the proposed changes. Check the Files changed tab for more details.
Fixed a couple of unused parameters, and dismissed the rest, which looked like false positives. They were all about the |
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
Fixed
Show fixed
Hide fixed
@Override | ||
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) | ||
{ | ||
if (dimensionSpec.getExtractionFn() != null || dimensionSpec.mustDecorate()) { |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
@Override | ||
public int partitionCount() | ||
{ | ||
if (maxPartitions == 1) { | ||
return maxPartitions; | ||
} else { | ||
// Number of actual partitions may be less than maxPartitions. | ||
throw new IllegalStateException("Number of partitions not known."); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic makes no sense to me? The partitionCount equals the maxPartitions if maxPartitions == 1? Doesn't that mean that partitionCount is either 1 or an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this method is never actually called, because we never need to know the partition count ahead of time for global-sort shuffle specs. This is a relic of a past version of the patch where global-sort with maxPartitions = 1 was a way of encoding "mix everything into a single partition". Now, there's an explicit "mix" spec, so this isn't needed any longer.
I replaced the entire thing with a throw
.
if (clusterBy.getBucketByCount() > 0) { | ||
// Only GlobalSortTargetSizeShuffleSpec supports bucket-by. | ||
throw new IAE("Cannot bucket with %s partitioning", TYPE); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this spec have to validate for something that is only supported by something else? Is there no way to make it so that the only thing that supports it is the only thing that takes it?
Additionally, if you were to interpolate the clusterBy
into the error message, would it generate something that tells the user which part of their query is doing weird things? (I'm imagining a complex query with lots of WITH and sub-queries, it can be hard to figure out which one is acting weird sometimes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there no way to make it so that the only thing that supports it is the only thing that takes it?
It could be done. It makes this code cleaner but then makes other code elsewhere dirtier. On balance I think it's better this way.
Additionally, if you were to interpolate the clusterBy into the error message, would it generate something that tells the user which part of their query is doing weird things?
If the planner is non-buggy then there would be no way a user sees this. (Unless they write their own MSQ spec, without going through SQL.) That being said, I think we might as well include clusterBy
here to help debug any potential planner bugs. I added it.
@Override | ||
public Either<Long, ClusterByPartitions> generatePartitionsForGlobalSort( | ||
@Nullable ClusterByStatisticsCollector collector, | ||
int maxNumPartitions | ||
) | ||
{ | ||
throw new IllegalStateException("Not a global sort"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the global sort methods exist only on the things that do global sort? One of the things that sucks the most about the AggregatorFactory and other interfaces is the number of methods that can be ignored and/or are only special purpose, would be nice to keep special things relegated to their own special location if at all possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Yes. I think it would need some kind of .class
or casting stuff.
I rejiggered things to have a specialized GlobalSortShuffleSpec interface that only the global-sort stuff implements. Calling code knows to check for this interface if the kind is GLOBAL_SORT
. I think it's a bit cleaner. Thanks for the suggestion.
/** | ||
* Returns the {@link ShuffleSpec} for this stage, if {@link #doesShuffle()}. | ||
* | ||
* @throws IllegalStateException if this stage does not shuffle | ||
*/ | ||
public ShuffleSpec getShuffleSpec() | ||
{ | ||
return Optional.ofNullable(shuffleSpec); | ||
if (shuffleSpec == null) { | ||
throw new IllegalStateException("Stage does not shuffle"); | ||
} | ||
|
||
return shuffleSpec; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to know that the stage shuffles or not before calling this method (to avoid the ISE)?
I see changes to doesSortDuringShuffle
, but that seems to indicate more than just "does the current stage "shuffle".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes: doesShuffle()
if (!mustGatherResultKeyStatistics()) { | ||
throw new ISE("No statistics needed"); | ||
throw new ISE("No statistics needed for stage[%d]", getStageNumber()); | ||
} | ||
|
||
return ClusterByStatisticsCollectorImpl.create( | ||
shuffleSpec.getClusterBy(), | ||
shuffleSpec.clusterBy(), | ||
signature, | ||
maxRetainedBytes, | ||
PARTITION_STATS_MAX_BUCKETS, | ||
shuffleSpec.doesAggregateByClusterKey(), | ||
shuffleSpec.doesAggregate(), | ||
shuffleCheckHasMultipleValues | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the semantics of "mustGatherStatistics", but "must gather" and "might choose to gather if the user asked nicely" could be seen as different things. If a user asked for full statistics on over their run, would the "mustGather" method end up returning true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's specifically talking about gathering result key statistics for the purposes of generating partitions for the global sort. (It's populating sketches to figure out range cut points.) The user doesn't enter into it really. I updated some javadoc to hopefully be more clear.
private void onNextIteration(final Runnable runnable) | ||
{ | ||
if (nextIterationRunnable != null) { | ||
throw new ISE("postAdvanceRunnable already set"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any extra information that can be interpolated or anything in here to try to help the person who runs into this exception figure out what is going on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that I can think of. If this ever happens, it's a bug in the class and someone's gonna have a fun time in the debugger.
// Key must be 100% sortable or 100% nonsortable. If empty, call it sortable. | ||
boolean sortable = true; | ||
|
||
for (int i = 0; i < columns.size(); i++) { | ||
final KeyColumn column = columns.get(i); | ||
|
||
if (i == 0) { | ||
sortable = column.order().sortable(); | ||
} else if (sortable != column.order().sortable()) { | ||
throw new IAE("Cannot mix sortable and unsortable key columns"); | ||
} | ||
} | ||
|
||
this.sortable = sortable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code nit, could also do:
if (columns.size == 0) {
// no columns means that it's sortable/sorted as they are effectively all sorted by the same non-existant, constant value.
this.sortable = true;
} else {
boolean sortable = columns.get(0).order().sortable();
for (int i = 1; i < columns.size; ++i) {
if (sortable != column.order().sortable()) {
throw new IAE("Cannot mix sortable and unsortable key columns");
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept it as-is since they both seem equally good to me and I'm lazy 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing an approval on this. I'm not certain that this has no problems (I don't have the context on the general framework to fully understand all of the ins-and-outs), but I'm confident that it's pretty well set up to only impact the MSQ-side of the house so it's not going to impact things outside of that. Gonna trust that the tests provide enough coverage that it's safe to merge (and that if/as we discover issues, we will resolve them).
* Sort-merge join and hash shuffles for MSQ. The main changes are in the processing, multi-stage-query, and sql modules. processing module: 1) Rename SortColumn to KeyColumn, replace boolean descending with KeyOrder. This makes it nicer to model hash keys, which use KeyOrder.NONE. 2) Add nullability checkers to the FieldReader interface, and an "isPartiallyNullKey" method to FrameComparisonWidget. The join processor uses this to detect null keys. 3) Add WritableFrameChannel.isClosed and OutputChannel.isReadableChannelReady so callers can tell which OutputChannels are ready for reading and which aren't. 4) Specialize FrameProcessors.makeCursor to return FrameCursor, a random-access implementation. The join processor uses this to rewind when it needs to replay a set of rows with a particular key. 5) Add MemoryAllocatorFactory, which is embedded inside FrameWriterFactory instead of a particular MemoryAllocator. This allows FrameWriterFactory to be shared in more scenarios. multi-stage-query module: 1) ShuffleSpec: Add hash-based shuffles. New enum ShuffleKind helps callers figure out what kind of shuffle is happening. The change from SortColumn to KeyColumn allows ClusterBy to be used for both hash-based and sort-based shuffling. 2) WorkerImpl: Add ability to handle hash-based shuffles. Refactor the logic to be more readable by moving the work-order-running code to the inner class RunWorkOrder, and the shuffle-pipeline-building code to the inner class ShufflePipelineBuilder. 3) Add SortMergeJoinFrameProcessor and factory. 4) WorkerMemoryParameters: Adjust logic to reserve space for output frames for hash partitioning. (We need one frame per partition.) sql module: 1) Add sqlJoinAlgorithm context parameter; can be "broadcast" or "sortMerge". With native, it must always be "broadcast", or it's a validation error. MSQ supports both. Default is "broadcast" in both engines. 2) Validate that MSQs do not use broadcast join with RIGHT or FULL join, as results are not correct for broadcast join with those types. Allow this in native for two reasons: legacy (the docs caution against it, but it's always been allowed), and the fact that it actually *does* generate correct results in native when the join is processed on the Broker. It is much less likely that MSQ will plan in such a way that generates correct results. 3) Remove subquery penalty in DruidJoinQueryRel when using sort-merge join, because subqueries are always required, so there's no reason to penalize them. 4) Move previously-disabled join reordering and manipulation rules to FANCY_JOIN_RULES, and enable them when using sort-merge join. Helps get to better plans where projections and filters are pushed down. * Work around compiler problem. * Updates from static analysis. * Fix @param tag. * Fix declared exception. * Fix spelling. * Minor adjustments. * wip * Merge fixups * fixes * Fix CalciteSelectQueryMSQTest * Empty keys are sortable. * Address comments from code review. Rename mux -> mix. * Restore inspection config. * Restore original doc. * Reorder imports. * Adjustments * Fix. * Fix imports. * Adjustments from review. * Update header. * Adjust docs.
Adds a distributed sort-merge join algorithm for MSQ.
The main changes are in the processing, multi-stage-query, and sql modules.
processing module:
Rename SortColumn to KeyColumn, replace boolean descending with KeyOrder.
This makes it nicer to model hash keys, which use KeyOrder.NONE.
Add nullability checkers to the FieldReader interface, and an
"isPartiallyNullKey" method to FrameComparisonWidget. The join
processor uses this to detect null keys.
Add WritableFrameChannel.isClosed and OutputChannel.isReadableChannelReady
so callers can tell which OutputChannels are ready for reading and which
aren't.
Specialize FrameProcessors.makeCursor to return FrameCursor, a random-access
implementation. The join processor uses this to rewind when it needs to
replay a set of rows with a particular key.
Add MemoryAllocatorFactory, which is embedded inside FrameWriterFactory
instead of a particular MemoryAllocator. This allows FrameWriterFactory
to be shared in more scenarios.
multi-stage-query module:
ShuffleSpec: Add hash-based shuffles. New enum ShuffleKind helps callers
figure out what kind of shuffle is happening. The change from SortColumn
to KeyColumn allows ClusterBy to be used for both hash-based and sort-based
shuffling.
WorkerImpl: Add ability to handle hash-based shuffles. Refactor the logic
to be more readable by moving the work-order-running code to the inner
class RunWorkOrder, and the shuffle-pipeline-building code to the inner
class ShufflePipelineBuilder.
Add SortMergeJoinFrameProcessor and factory.
WorkerMemoryParameters: Adjust logic to reserve space for output frames
for hash partitioning. (We need one frame per partition.)
sql module:
Add sqlJoinAlgorithm context parameter; can be "broadcast" or
"sortMerge". With native, it must always be "broadcast", or it's a
validation error. MSQ supports both. Default is "broadcast" in
both engines.
Validate that MSQs do not use broadcast join with RIGHT or FULL join,
as results are not correct for broadcast join with those types. Allow
this in native for two reasons: legacy (the docs caution against it,
but it's always been allowed), and the fact that it actually does
generate correct results in native when the join is processed on the
Broker. It is much less likely that MSQ will plan in such a way that
generates correct results.
Remove subquery penalty in DruidJoinQueryRel when using sort-merge
join, because subqueries are always required, so there's no reason
to penalize them.
Move previously-disabled join reordering and manipulation rules to
FANCY_JOIN_RULES, and enable them when using sort-merge join. Helps
get to better plans where projections and filters are pushed down.