Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort-merge join and hash shuffles for MSQ. #13506

Merged
merged 29 commits into from
Mar 8, 2023

Conversation

gianm
Copy link
Contributor

@gianm gianm commented Dec 6, 2022

Adds a distributed sort-merge join algorithm for MSQ.

The main changes are in the processing, multi-stage-query, and sql modules.

processing module:

  1. Rename SortColumn to KeyColumn, replace boolean descending with KeyOrder.
    This makes it nicer to model hash keys, which use KeyOrder.NONE.

  2. Add nullability checkers to the FieldReader interface, and an
    "isPartiallyNullKey" method to FrameComparisonWidget. The join
    processor uses this to detect null keys.

  3. Add WritableFrameChannel.isClosed and OutputChannel.isReadableChannelReady
    so callers can tell which OutputChannels are ready for reading and which
    aren't.

  4. Specialize FrameProcessors.makeCursor to return FrameCursor, a random-access
    implementation. The join processor uses this to rewind when it needs to
    replay a set of rows with a particular key.

  5. Add MemoryAllocatorFactory, which is embedded inside FrameWriterFactory
    instead of a particular MemoryAllocator. This allows FrameWriterFactory
    to be shared in more scenarios.

multi-stage-query module:

  1. ShuffleSpec: Add hash-based shuffles. New enum ShuffleKind helps callers
    figure out what kind of shuffle is happening. The change from SortColumn
    to KeyColumn allows ClusterBy to be used for both hash-based and sort-based
    shuffling.

  2. WorkerImpl: Add ability to handle hash-based shuffles. Refactor the logic
    to be more readable by moving the work-order-running code to the inner
    class RunWorkOrder, and the shuffle-pipeline-building code to the inner
    class ShufflePipelineBuilder.

  3. Add SortMergeJoinFrameProcessor and factory.

  4. WorkerMemoryParameters: Adjust logic to reserve space for output frames
    for hash partitioning. (We need one frame per partition.)

sql module:

  1. Add sqlJoinAlgorithm context parameter; can be "broadcast" or
    "sortMerge". With native, it must always be "broadcast", or it's a
    validation error. MSQ supports both. Default is "broadcast" in
    both engines.

  2. Validate that MSQs do not use broadcast join with RIGHT or FULL join,
    as results are not correct for broadcast join with those types. Allow
    this in native for two reasons: legacy (the docs caution against it,
    but it's always been allowed), and the fact that it actually does
    generate correct results in native when the join is processed on the
    Broker. It is much less likely that MSQ will plan in such a way that
    generates correct results.

  3. Remove subquery penalty in DruidJoinQueryRel when using sort-merge
    join, because subqueries are always required, so there's no reason
    to penalize them.

  4. Move previously-disabled join reordering and manipulation rules to
    FANCY_JOIN_RULES, and enable them when using sort-merge join. Helps
    get to better plans where projections and filters are pushed down.

The main changes are in the processing, multi-stage-query, and sql modules.

processing module:

1) Rename SortColumn to KeyColumn, replace boolean descending with KeyOrder.
   This makes it nicer to model hash keys, which use KeyOrder.NONE.

2) Add nullability checkers to the FieldReader interface, and an
   "isPartiallyNullKey" method to FrameComparisonWidget. The join
   processor uses this to detect null keys.

3) Add WritableFrameChannel.isClosed and OutputChannel.isReadableChannelReady
   so callers can tell which OutputChannels are ready for reading and which
   aren't.

4) Specialize FrameProcessors.makeCursor to return FrameCursor, a random-access
   implementation. The join processor uses this to rewind when it needs to
   replay a set of rows with a particular key.

5) Add MemoryAllocatorFactory, which is embedded inside FrameWriterFactory
   instead of a particular MemoryAllocator. This allows FrameWriterFactory
   to be shared in more scenarios.

multi-stage-query module:

1) ShuffleSpec: Add hash-based shuffles. New enum ShuffleKind helps callers
   figure out what kind of shuffle is happening. The change from SortColumn
   to KeyColumn allows ClusterBy to be used for both hash-based and sort-based
   shuffling.

2) WorkerImpl: Add ability to handle hash-based shuffles. Refactor the logic
   to be more readable by moving the work-order-running code to the inner
   class RunWorkOrder, and the shuffle-pipeline-building code to the inner
   class ShufflePipelineBuilder.

3) Add SortMergeJoinFrameProcessor and factory.

4) WorkerMemoryParameters: Adjust logic to reserve space for output frames
   for hash partitioning. (We need one frame per partition.)

sql module:

1) Add sqlJoinAlgorithm context parameter; can be "broadcast" or
   "sortMerge". With native, it must always be "broadcast", or it's a
   validation error. MSQ supports both. Default is "broadcast" in
   both engines.

2) Validate that MSQs do not use broadcast join with RIGHT or FULL join,
   as results are not correct for broadcast join with those types. Allow
   this in native for two reasons: legacy (the docs caution against it,
   but it's always been allowed), and the fact that it actually *does*
   generate correct results in native when the join is processed on the
   Broker. It is much less likely that MSQ will plan in such a way that
   generates correct results.

3) Remove subquery penalty in DruidJoinQueryRel when using sort-merge
   join, because subqueries are always required, so there's no reason
   to penalize them.

4) Move previously-disabled join reordering and manipulation rules to
   FANCY_JOIN_RULES, and enable them when using sort-merge join. Helps
   get to better plans where projections and filters are pushed down.
@gianm gianm added the Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 label Dec 6, 2022
@lgtm-com
Copy link

lgtm-com bot commented Dec 6, 2022

This pull request introduces 1 alert and fixes 1 when merging 7b6c47a into 136322d - view on LGTM.com

new alerts:

  • 1 for Spurious Javadoc @param tags

fixed alerts:

  • 1 for Uncontrolled data used in path expression

Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog.

@lgtm-com
Copy link

lgtm-com bot commented Dec 7, 2022

This pull request fixes 1 alert when merging d2d56f3 into 83261f9 - view on LGTM.com

fixed alerts:

  • 1 for Uncontrolled data used in path expression

Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog.

Copy link
Contributor

@imply-cheddar imply-cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only just a few nit-level comments so far. I haven't gotten too far, but wanted to leave them as I will need to wander off and come back to this. The last thing I did was read through WorkerImpl, as a newbie to this side of the code, I didn't know if I should expect it to just be a whole bunch of wiring up of objects or if there should be actual logic to it. From what I saw, it seemed like it's all doing just wiring up of things. If that's expected, great. If there was supposed to be business logic inside of it that is essential for the sortMerge algorithm, it got lost on me in the overall scope of what WorkerImpl is doing.

@@ -198,13 +198,99 @@ The following table lists the context parameters for the MSQ task engine:
| `maxNumTasks` | SELECT, INSERT, REPLACE<br /><br />The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.<br /><br />May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority.| 2 |
| `taskAssignment` | SELECT, INSERT, REPLACE<br /><br />Determines how many tasks to use. Possible values include: <ul><li>`max`: Uses as many tasks as possible, up to `maxNumTasks`.</li><li>`auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 10 GiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.</li></ul> | `max` |
| `finalizeAggregations` | SELECT, INSERT, REPLACE<br /><br />Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true |
| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of thing would normally be delivered via a hint in the SQL, perhaps it's not too hard to deliver it that way?

Copy link
Contributor Author

@gianm gianm Dec 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, hints are not available in the version of Calcite version we use. Newer versions have this: https://calcite.apache.org/docs/reference.html#sql-hints

@abhishekagarwal87 found some reasons we couldn't do it right now, as referenced by this comment: #13153 (comment). @abhishekagarwal87 would you mind writing up your notes as to what blocks an upgrade, and making an issue about that, titled something like Upgrade Calcite past <whatever version introduced the blocking problem>? That way, we have an issue we can refer to and use to discuss possible ways to fix the blockers.

Once we sort that out, I'd like to deprecate the context parameter and move things to use hints (and eventually statistics as well) instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm - Here it is. #13532.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making some progress here, described on #13532. It will require another Calcite release, so I think it's best if we merge this sort-merge join PR first, then do the Calcite upgrade and add hints.

Comment on lines +599 to +600
stageDef.getShuffleSpec().clusterBy(),
stageDef.getShuffleSpec().doesAggregate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You hate the word "get"/"is"? "getClusterBy()" "isDoesAggregate()"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love them for holder objects that don't "do" anything. Just personal preference I guess.

@@ -73,4 +74,9 @@
DruidNode selfNode();

Bouncer processorBouncer();

default File tempDir(int stageNumber, String id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit brigade! I think we tend to use tmp in other parts of the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was modeling it off File tempDir() that is already here without args. I can change both? This one doesn't matter much to me either way. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like consistency. If it's always tmp then it's much easier to say that next time it should be tmp as well.


public enum KeyOrder
{
NONE(false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of NONE means "hashed", would it make sense to just call it HASHED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. What I was thinking is not that "none" means "not ordered" (in ref to the enum name KeyOrder). Like, a "none" ordered thing might be used as a hash code, or it might not. I'll clarify this with a comment. If you still think it should be HASHED, let me know so I can think about it more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your description on this PR, I took away that adding NONE here allows you to use that to mean HASHED partitioning.

Maybe I'm not understanding the point of this class, but I think I assumed it would only be instantiated if someone was effecting an ordering/clustering/partitioning of some sort? I guess, even on the comment, what does it mean for the key to be used for non-sorting purposes, if the point of creating the object is for sort-type purposes? The answer left to me is "well, then it's used for clustering/partitioning" in which case, I don't know of an algorithm other than HASH, so seems like we can just name it how we expect it to be used and then ask ourselves what to do when we come up with a new usage that doesn't fit the name HASH?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I thought about it more, so much more that it took a few months 😄

NONE actually does make sense rather than HASHED, since we can do hash partitioning along with sorting. (Meaning that ASCENDING or DESCENDING key order may still be involved in hash partitioning.)

I'll add comments discussing this.

cancellationId
);

inputSliceReader = new MapInputSliceReader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this Map guaranteed to be used to read from all of the different types of slices? Is there a reason that it needs to be re-instantiated instead of just built once and reused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's built once per worker. It does have some special pieces that are specific to this worker, so it isn't shared across workers.

@gianm
Copy link
Contributor Author

gianm commented Dec 7, 2022

Only just a few nit-level comments so far. I haven't gotten too far, but wanted to leave them as I will need to wander off and come back to this. The last thing I did was read through WorkerImpl, as a newbie to this side of the code, I didn't know if I should expect it to just be a whole bunch of wiring up of objects or if there should be actual logic to it. From what I saw, it seemed like it's all doing just wiring up of things. If that's expected, great. If there was supposed to be business logic inside of it that is essential for the sortMerge algorithm, it got lost on me in the overall scope of what WorkerImpl is doing.

Thanks for taking a look!

WorkerImpl is indeed meant to just write things up and them let them rip.

The join algorithm is entirely contained within SortMergeJoinFrameProcessor. The rest of the changes (the majority of the changes) are about adding hash-based shuffles to MSQ, which previously only included range-based shuffles.

@lgtm-com
Copy link

lgtm-com bot commented Dec 8, 2022

This pull request fixes 1 alert when merging 4514d23 into b56855b - view on LGTM.com

fixed alerts:

  • 1 for Uncontrolled data used in path expression

Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine ⚙️ that powers LGTM.com. For more information, please check out our post on the GitHub blog.

Copy link
Contributor

@imply-cheddar imply-cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got through more, so a few more comments. It's mostly stream of consciousness stuff/thoughts as I read through things.

/**
* Channels from {@link #inputs}. Two-element array: {@link #LEFT} and {@link #RIGHT}.
*/
private final List<ReadableFrameChannel> inputChannels;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have 2 fields: a left and a right ReadableFrameChannel instead of a List that you do .get() on all the time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, given that you have multiple lists here. Maybe what you need is a JoinChannelContainer or something like that which has a reference to all of ReadableInput, ReadableFrameChannel, List<KeyColumn>, Tracker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As to the lists vs. using 2 fields: I was thinking that someday this code would be extended to handle joining more than two channels at once.

As to the container thingy: I decided to put most of that stuff into the Tracker, which is already a kind of container. Except for inputChannels, because that's used as a return value for inputChannels(). It's a bit more readable, IMO.


while (!trackers.get(LEFT).needsMoreData()
&& !trackers.get(RIGHT).needsMoreData()
&& !(trackers.get(LEFT).isAtEnd() && trackers.get(RIGHT).isAtEnd())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This boolean is large... There's lots of negation. It might be nice to negate a large OR instead of 3 negated-ANDs, but if there's any way to make this boolean simpler, that might be nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to simplify it. It looks like this now:

    while (!allTrackersAreAtEnd()
           && !trackers.get(LEFT).needsMoreData()
           && !trackers.get(RIGHT).needsMoreData()) {
      // Algorithm can proceed: not all trackers are at the end of their streams, and no tracker needs more data to
      // read the current cursor or move it forward.

The comment is new too. It spells out the logic as well.

}

final int markCmp = compareMarks();
final boolean match = markCmp == 0 && !trackers.get(LEFT).hasPartiallyNullMark();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I read this line, I'm unclear why "partially null" matters. Maybe it'll become clearer as I read more. I went back up to the class level javadoc and skimmed it one more time, but didn't see the relevance of null called out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to achieve proper join semantics. I added a comment:

      // Two rows match if the keys compare equal _and_ neither key has a null component. (x JOIN y ON x.a = y.a does
      // not match rows where "x.a" is null.)

if (match && trackerWithCompleteSetForCurrentKey < 0) {
for (int i = 0; i < inputChannels.size(); i++) {
final Tracker tracker = trackers.get(i);
if (tracker.hasCompleteSetForMark() || (pushNextFrame(i) && tracker.hasCompleteSetForMark())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(pushNextFrame(i) && tracker.hasCompleteSetForMark())

It's unclear to me why it is sufficient to only push a single next frame and check. This sort of thing is usually in a while-loop, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adjusted some comments to hopefully clarify this.

Comment on lines +546 to +549
if (currentHolder.cursor.isDone() && currentFrame + 1 < holders.size()) {
currentFrame++;
holders.get(currentFrame).cursor.reset();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to never be letting go of the previous FrameHolder objects? It can effectively null out the old holder, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a holders.clear() in markCurrent(). We can't get rid of the old holders until the mark moves on. (The purpose of holding on to them is to enable us to collect a complete set of rows for the marked key.)

I added comments to holders clarifying this.

final FrameHolder markHolder = holders.get(markFrame);
final int cmp = markHolder.comparisonWidget.compare(markRow, holder.comparisonWidget, row);

assert cmp <= 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how important is this assert? If it's truly important, you should perhaps have an if statement (that branch prediction should be able to do a really good job with)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was important to me during development 🙂

I replaced these with if statements.

Comment on lines +747 to +750
// In default-value mode, null country number from the left-hand table converts to zero, which matches Australia.
countryCodeForNull = "AU";
countryNameForNull = "Australia";
countryNumberForNull = 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm like 80% sure this assumption would be incorrect if the long column actually came from the nested column instead of the "normal" top-level long column. That is, it's perhaps less a function of the "default-value mode" and more a function of the column implementation that happens to be getting used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Although, it's accurate as-is, because the null from the left-hand side really does convert to zero, since the left-hand side isn't using nested columns. Any suggestions on how to rewrite the comment to be better?

Copy link
Contributor

@imply-cheddar imply-cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I've now read through all of the classes. Or, read them as best I can. I'm a newbie to this code, so the best I can do is a surface-level review, which is what I've done. Most of my comments are stream-of-consciousness because, I fear that I lack the context to really comment meaningfully.

I did find code that looked a bit more like business logic inside of the SortMergeJoinFrameProcessor, so, I now know that there is business logic in there. But, I'm still left with the impression that I don't have a great handle on how that business logic works. I feel like even that class is almost all code that's just dealing with the framework (seems like it's mostly code to juggle asynchronous execution?) and it makes it hard for me, without the context of what is important and what is not, to really piece out where the business logic truly lives. I'm not sure if anything can be done about that, but that's the deepest level of commentary I can provide on this ;).

In terms of approving merge or not approving merge. I don't know how "safe" this is. It seems to touch a lot of different files and there does seem to be good test coverage, is this safe to just merge and it'll work? Is there anything extra done in the code to ensure that the old code paths are untouched in the worst case that there is a bug in here?

Comment on lines 93 to 95
return memory.getByte(position) == StringFieldWriter.NULL_BYTE
&& memory.getByte(position + 1) == StringFieldWriter.VALUE_TERMINATOR
&& memory.getByte(position + 2) == StringFieldWriter.ROW_TERMINATOR;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe either read a 3-byte array and test for equality or, if there's padding, you could read an int, mask away a byte and test equals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't necessarily going to be an extra byte of padding, and I'm not sure that reading a 3-byte array is going to be better. I'd prefer to leave this as-is.

@@ -58,6 +59,21 @@ public ClusterBy(
if (bucketByCount < 0 || bucketByCount > columns.size()) {
throw new IAE("Invalid bucketByCount [%d]", bucketByCount);
}

// Key must be 100% sortable or 100% nonsortable. If empty, call it unsortable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is an empty key really unsortable? An empty key would effectively be equivalent to a constant value, and all data sets are sorted by a constant value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about it a bit, I changed this so an empty key is considered sortable, since I agree that makes more sense.


public enum KeyOrder
{
NONE(false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your description on this PR, I took away that adding NONE here allows you to use that to mean HASHED partitioning.

Maybe I'm not understanding the point of this class, but I think I assumed it would only be instantiated if someone was effecting an ordering/clustering/partitioning of some sort? I guess, even on the comment, what does it mean for the key to be used for non-sorting purposes, if the point of creating the object is for sort-type purposes? The answer left to me is "well, then it's used for clustering/partitioning" in which case, I don't know of an algorithm other than HASH, so seems like we can just name it how we expect it to be used and then ask ourselves what to do when we come up with a new usage that doesn't fit the name HASH?

@gianm
Copy link
Contributor Author

gianm commented Feb 22, 2023

Okay, I've now read through all of the classes. Or, read them as best I can. I'm a newbie to this code, so the best I can do is a surface-level review, which is what I've done. Most of my comments are stream-of-consciousness because, I fear that I lack the context to really comment meaningfully.

Thanks, I still appreciate it.

In terms of approving merge or not approving merge. I don't know how "safe" this is. It seems to touch a lot of different files and there does seem to be good test coverage, is this safe to just merge and it'll work? Is there anything extra done in the code to ensure that the old code paths are untouched in the worst case that there is a bug in here?

The SQL planning changes are designed to be low risk when not using sort-merge joins. The new code is generally additive and the bulk of it will not execute for the default (broadcast) join algorithm.

The MSQ and processing changes, other than the core sort-merge logic, are mostly related to adding hash partitioning. I tried to keep this as minimally invasive as possible, but it did have to touch various places and it could not be purely additive. To minimize the risk, I've added new tests. I will fix any problems that turn up.

@gianm
Copy link
Contributor Author

gianm commented Feb 22, 2023

@imply-cheddar I've pushed up changes responsive to your comments. Thanks for reviewing.

Copy link

@github-advanced-security github-advanced-security bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CodeQL found more than 10 potential problems in the proposed changes. Check the Files changed tab for more details.

@gianm
Copy link
Contributor Author

gianm commented Feb 23, 2023

CodeQL found more than 10 potential problems in the proposed changes. Check the Files changed tab for more details.

Fixed a couple of unused parameters, and dismissed the rest, which looked like false positives. They were all about the partition field in FrameChannelHashPartitioner, which comes from HashPartitionVirtualColumn.

@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
if (dimensionSpec.getExtractionFn() != null || dimensionSpec.mustDecorate()) {

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [DimensionSpec.getExtractionFn](1) should be avoided because it has been deprecated.
Comment on lines 118 to 127
@Override
public int partitionCount()
{
if (maxPartitions == 1) {
return maxPartitions;
} else {
// Number of actual partitions may be less than maxPartitions.
throw new IllegalStateException("Number of partitions not known.");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic makes no sense to me? The partitionCount equals the maxPartitions if maxPartitions == 1? Doesn't that mean that partitionCount is either 1 or an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this method is never actually called, because we never need to know the partition count ahead of time for global-sort shuffle specs. This is a relic of a past version of the patch where global-sort with maxPartitions = 1 was a way of encoding "mix everything into a single partition". Now, there's an explicit "mix" spec, so this isn't needed any longer.

I replaced the entire thing with a throw.

Comment on lines 48 to 51
if (clusterBy.getBucketByCount() > 0) {
// Only GlobalSortTargetSizeShuffleSpec supports bucket-by.
throw new IAE("Cannot bucket with %s partitioning", TYPE);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this spec have to validate for something that is only supported by something else? Is there no way to make it so that the only thing that supports it is the only thing that takes it?

Additionally, if you were to interpolate the clusterBy into the error message, would it generate something that tells the user which part of their query is doing weird things? (I'm imagining a complex query with lots of WITH and sub-queries, it can be hard to figure out which one is acting weird sometimes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no way to make it so that the only thing that supports it is the only thing that takes it?

It could be done. It makes this code cleaner but then makes other code elsewhere dirtier. On balance I think it's better this way.

Additionally, if you were to interpolate the clusterBy into the error message, would it generate something that tells the user which part of their query is doing weird things?

If the planner is non-buggy then there would be no way a user sees this. (Unless they write their own MSQ spec, without going through SQL.) That being said, I think we might as well include clusterBy here to help debug any potential planner bugs. I added it.

Comment on lines 79 to 86
@Override
public Either<Long, ClusterByPartitions> generatePartitionsForGlobalSort(
@Nullable ClusterByStatisticsCollector collector,
int maxNumPartitions
)
{
throw new IllegalStateException("Not a global sort");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the global sort methods exist only on the things that do global sort? One of the things that sucks the most about the AggregatorFactory and other interfaces is the number of methods that can be ignored and/or are only special purpose, would be nice to keep special things relegated to their own special location if at all possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Yes. I think it would need some kind of .class or casting stuff.

I rejiggered things to have a specialized GlobalSortShuffleSpec interface that only the global-sort stuff implements. Calling code knows to check for this interface if the kind is GLOBAL_SORT. I think it's a bit cleaner. Thanks for the suggestion.

Comment on lines +229 to 241
/**
* Returns the {@link ShuffleSpec} for this stage, if {@link #doesShuffle()}.
*
* @throws IllegalStateException if this stage does not shuffle
*/
public ShuffleSpec getShuffleSpec()
{
return Optional.ofNullable(shuffleSpec);
if (shuffleSpec == null) {
throw new IllegalStateException("Stage does not shuffle");
}

return shuffleSpec;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to know that the stage shuffles or not before calling this method (to avoid the ISE)?

I see changes to doesSortDuringShuffle, but that seems to indicate more than just "does the current stage "shuffle".

Copy link
Contributor Author

@gianm gianm Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes: doesShuffle()

Comment on lines 348 to 359
if (!mustGatherResultKeyStatistics()) {
throw new ISE("No statistics needed");
throw new ISE("No statistics needed for stage[%d]", getStageNumber());
}

return ClusterByStatisticsCollectorImpl.create(
shuffleSpec.getClusterBy(),
shuffleSpec.clusterBy(),
signature,
maxRetainedBytes,
PARTITION_STATS_MAX_BUCKETS,
shuffleSpec.doesAggregateByClusterKey(),
shuffleSpec.doesAggregate(),
shuffleCheckHasMultipleValues
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the semantics of "mustGatherStatistics", but "must gather" and "might choose to gather if the user asked nicely" could be seen as different things. If a user asked for full statistics on over their run, would the "mustGather" method end up returning true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's specifically talking about gathering result key statistics for the purposes of generating partitions for the global sort. (It's populating sketches to figure out range cut points.) The user doesn't enter into it really. I updated some javadoc to hopefully be more clear.

private void onNextIteration(final Runnable runnable)
{
if (nextIterationRunnable != null) {
throw new ISE("postAdvanceRunnable already set");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any extra information that can be interpolated or anything in here to try to help the person who runs into this exception figure out what is going on?

Copy link
Contributor Author

@gianm gianm Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I can think of. If this ever happens, it's a bug in the class and someone's gonna have a fun time in the debugger.

Comment on lines +63 to +76
// Key must be 100% sortable or 100% nonsortable. If empty, call it sortable.
boolean sortable = true;

for (int i = 0; i < columns.size(); i++) {
final KeyColumn column = columns.get(i);

if (i == 0) {
sortable = column.order().sortable();
} else if (sortable != column.order().sortable()) {
throw new IAE("Cannot mix sortable and unsortable key columns");
}
}

this.sortable = sortable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code nit, could also do:

if (columns.size == 0) {
  // no columns means that it's sortable/sorted as they are effectively all sorted by the same non-existant, constant value.
  this.sortable = true;
} else {
  boolean sortable = columns.get(0).order().sortable();
  for (int i = 1; i < columns.size; ++i)  {
    if (sortable != column.order().sortable()) {
      throw new IAE("Cannot mix sortable and unsortable key columns");
    }
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept it as-is since they both seem equally good to me and I'm lazy 🙂

Copy link
Contributor

@imply-cheddar imply-cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing an approval on this. I'm not certain that this has no problems (I don't have the context on the general framework to fully understand all of the ins-and-outs), but I'm confident that it's pretty well set up to only impact the MSQ-side of the house so it's not going to impact things outside of that. Gonna trust that the tests provide enough coverage that it's safe to merge (and that if/as we discover issues, we will resolve them).

@gianm gianm merged commit 82f7a56 into apache:master Mar 8, 2023
@gianm gianm deleted the msq-sort-merge-join branch March 8, 2023 22:19
317brian pushed a commit to 317brian/druid that referenced this pull request Mar 10, 2023
* Sort-merge join and hash shuffles for MSQ.

The main changes are in the processing, multi-stage-query, and sql modules.

processing module:

1) Rename SortColumn to KeyColumn, replace boolean descending with KeyOrder.
   This makes it nicer to model hash keys, which use KeyOrder.NONE.

2) Add nullability checkers to the FieldReader interface, and an
   "isPartiallyNullKey" method to FrameComparisonWidget. The join
   processor uses this to detect null keys.

3) Add WritableFrameChannel.isClosed and OutputChannel.isReadableChannelReady
   so callers can tell which OutputChannels are ready for reading and which
   aren't.

4) Specialize FrameProcessors.makeCursor to return FrameCursor, a random-access
   implementation. The join processor uses this to rewind when it needs to
   replay a set of rows with a particular key.

5) Add MemoryAllocatorFactory, which is embedded inside FrameWriterFactory
   instead of a particular MemoryAllocator. This allows FrameWriterFactory
   to be shared in more scenarios.

multi-stage-query module:

1) ShuffleSpec: Add hash-based shuffles. New enum ShuffleKind helps callers
   figure out what kind of shuffle is happening. The change from SortColumn
   to KeyColumn allows ClusterBy to be used for both hash-based and sort-based
   shuffling.

2) WorkerImpl: Add ability to handle hash-based shuffles. Refactor the logic
   to be more readable by moving the work-order-running code to the inner
   class RunWorkOrder, and the shuffle-pipeline-building code to the inner
   class ShufflePipelineBuilder.

3) Add SortMergeJoinFrameProcessor and factory.

4) WorkerMemoryParameters: Adjust logic to reserve space for output frames
   for hash partitioning. (We need one frame per partition.)

sql module:

1) Add sqlJoinAlgorithm context parameter; can be "broadcast" or
   "sortMerge". With native, it must always be "broadcast", or it's a
   validation error. MSQ supports both. Default is "broadcast" in
   both engines.

2) Validate that MSQs do not use broadcast join with RIGHT or FULL join,
   as results are not correct for broadcast join with those types. Allow
   this in native for two reasons: legacy (the docs caution against it,
   but it's always been allowed), and the fact that it actually *does*
   generate correct results in native when the join is processed on the
   Broker. It is much less likely that MSQ will plan in such a way that
   generates correct results.

3) Remove subquery penalty in DruidJoinQueryRel when using sort-merge
   join, because subqueries are always required, so there's no reason
   to penalize them.

4) Move previously-disabled join reordering and manipulation rules to
   FANCY_JOIN_RULES, and enable them when using sort-merge join. Helps
   get to better plans where projections and filters are pushed down.

* Work around compiler problem.

* Updates from static analysis.

* Fix @param tag.

* Fix declared exception.

* Fix spelling.

* Minor adjustments.

* wip

* Merge fixups

* fixes

* Fix CalciteSelectQueryMSQTest

* Empty keys are sortable.

* Address comments from code review. Rename mux -> mix.

* Restore inspection config.

* Restore original doc.

* Reorder imports.

* Adjustments

* Fix.

* Fix imports.

* Adjustments from review.

* Update header.

* Adjust docs.
@clintropolis clintropolis added this to the 26.0 milestone Apr 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Documentation Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants