-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Relax join keys constraint from Column to any physical expression for physical join operators #8991
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @viirya -- I went through this PR carefully and I agree it makes sense to me. In many ways I think it is cleaner now as the Joins now don't handle Columns specially and instead just have PhysicalExprs
Given this is step one, of likely several steps needed to support using DataFusion for spark Joins, I wonder if we should create an EPIC
style ticket that lists known steps (e.g. "investigate removing projection before join inputs")
@jackwener and @liukun4515 and @mustafasrepo perhaps you have some time to review this PR as well.
Also, @korowa , is another of our resident Join experts, perhaps he has some time to weigh in
BTW for context, I believe this feature is in support of the native spark execution engine apache/datafusion-comet#1 which is in the process of being donated to Apache.
I am running some basic benchmarks now to confirm this change doesn't impact performance (I don't expect that it does)
@@ -278,7 +277,7 @@ pub struct HashJoinExec { | |||
/// right (probe) side which are filtered by the hash table | |||
pub right: Arc<dyn ExecutionPlan>, | |||
/// Set of equijoin columns from the relations: `(left_col, right_col)` | |||
pub on: Vec<(Column, Column)>, | |||
pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
}) | ||
.unzip(); | ||
let (left_expr, right_expr) = | ||
on.iter().map(|(l, r)| (l.clone(), r.clone())).unzip(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In many ways this change makes the code cleaner as it doesn't have to special case Column so much
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are my benchmark results (no change)
--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query ┃ main_base ┃ relex_sort_merge_join_keys ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1 │ 206.84ms │ 213.29ms │ no change │
│ QQuery 2 │ 45.97ms │ 45.34ms │ no change │
│ QQuery 3 │ 76.92ms │ 74.94ms │ no change │
│ QQuery 4 │ 73.96ms │ 71.63ms │ no change │
│ QQuery 5 │ 122.80ms │ 121.84ms │ no change │
│ QQuery 6 │ 16.43ms │ 16.32ms │ no change │
│ QQuery 7 │ 322.01ms │ 315.56ms │ no change │
│ QQuery 8 │ 80.29ms │ 80.99ms │ no change │
│ QQuery 9 │ 125.44ms │ 125.68ms │ no change │
│ QQuery 10 │ 154.89ms │ 152.95ms │ no change │
│ QQuery 11 │ 35.15ms │ 34.23ms │ no change │
│ QQuery 12 │ 70.34ms │ 69.77ms │ no change │
│ QQuery 13 │ 87.38ms │ 85.58ms │ no change │
│ QQuery 14 │ 27.04ms │ 26.31ms │ no change │
│ QQuery 15 │ 61.48ms │ 60.55ms │ no change │
│ QQuery 16 │ 45.86ms │ 45.40ms │ no change │
│ QQuery 17 │ 149.32ms │ 145.65ms │ no change │
│ QQuery 18 │ 448.61ms │ 456.14ms │ no change │
│ QQuery 19 │ 61.49ms │ 64.97ms │ 1.06x slower │
│ QQuery 20 │ 115.97ms │ 115.43ms │ no change │
│ QQuery 21 │ 355.94ms │ 360.97ms │ no change │
│ QQuery 22 │ 29.36ms │ 29.63ms │ no change │
└──────────────┴───────────┴────────────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main_base) │ 2713.49ms │
│ Total Time (relex_sort_merge_join_keys) │ 2713.16ms │
│ Average Time (main_base) │ 123.34ms │
│ Average Time (relex_sort_merge_join_keys) │ 123.33ms │
│ Queries Faster │ 0 │
│ Queries Slower │ 1 │
│ Queries with No Change │ 21 │
└───────────────────────────────────────────┴───────────┘
Thank you @alamb Note that "investigate removing projection before join input" is not a necessary step for join support in Comet, because Comet doesn't depend on DataFusion's query planning. But definitely we have other steps to work on DataFusion Join.
Yes. As mentioned the PR description, Spark join operators have general expressions as join keys instead of columns only. We need to align the physical operator join key constraints to make it possible for more query cases. |
That makes sense. I think having DataFusion planned joins and Spark planned joins going through different codepaths not only potentially duplicates code it makes it more lkely there are discrepancies that may lead to bugs (e.g. it may be hard to test the spark path as part of the datafusion tests) For that reason I suggest we make a ticket listing the overall project where we can collect other follow on steps as we come on tehem (not that you or Comet people will actually do the work, other contributors are likely to be able to help too I suspect). |
For the creation of an EPIC style ticket, I am hesitate about it because for now I don't have all steps known to do. I may have next one step/work to do as I saw it during experimenting the integration of Spark and DataFusion with Comet on SortMergeJoin. But this is basically new attempt and I am not sure what issues could be encountered after next. Currently I'm working on it in an incremental style to solve each issue happened/seen during the integration. |
This makes sense and in fact just figuring out all the steps to do is typically a large undertaking itself I have had reasonable luck with partially formed Epic that we incrementally and collaboratively fill out (e.g. #8916 and #3148 ) If you don't object, I will file such a ticket and we can see how useful / not useful it is. |
BTW the reason I am pushing on the epic ticket is twofold:
|
No, I don't object it at all, I just feel I have no too much steps to fill there. 😄 Hope it could be useful. 👍 |
I will review this as soon as possible, particularly the effects on SHJ. |
let mut columns = vec![]; | ||
left.apply(&mut |expr| { | ||
Ok({ | ||
if let Some(column) = expr.as_any().downcast_ref::<Column>() { | ||
columns.push(column.clone()); | ||
} | ||
VisitRecursion::Continue | ||
}) | ||
}) | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this section, I think you can use collect_columns
util.
let mut columns = vec![]; | ||
right | ||
.apply(&mut |expr| { | ||
Ok({ | ||
if let Some(column) = expr.as_any().downcast_ref::<Column>() { | ||
columns.push(column.clone()); | ||
} | ||
VisitRecursion::Continue | ||
}) | ||
}) | ||
.unwrap(); | ||
columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to comment above
@@ -1886,8 +1887,8 @@ pub(crate) mod tests { | |||
|
|||
// Join on (a == b1) | |||
let join_on = vec![( | |||
Column::new_with_schema("a", &schema()).unwrap(), | |||
Column::new_with_schema("b1", &right.schema()).unwrap(), | |||
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use col
function (which is in the filedatafusion/physical_expr/src/expressions/column.rs
) here
as col("a", &schema()).unwrap()
instead of Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _
. However, there are lots of changes like this. And it is purely stylistic. This change is not important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @viirya for this PR. I left some minor comments, however they are not essential. LGTM!.
I merged this PR up from main to pick up the clippy fix |
Thanks again @viirya |
Thank you @alamb @mustafasrepo |
I wrote up #9056 (comment) While doing so, I wonder if comet could use a custom PhysicalOptimzer pass to add a ProjectionExec 🤔 |
As I mentioned in the description, adding an additional Projection under Join (e.g., SortMergeJoin) doesn't make a lot sense for Spark due to its distribution nature. An additional Projection means we materialize extra columns (the join key expressions) earlier and which could lead to additional data in shuffle/sort. So what this does is to relax join key constraints in DataFusion, for example now physical SortMergeJoin can take expressions other than Column as join keys. In Spark/Comet side, now we can translate Spark SortMergeJoin directly to DataFusion SortMergeJoin. We don't need to change Spark physical query structure. |
🤔
I don't understand this If the join is on wouldn't it actually make more sense to compute the expressions prior to the networked shuffle so only 2 columns of data ( |
Hmm, except for joining keys, I think you still can list other columns (e.g., the original 4 columns) into selection list? So they are not always able to be removed from shuffle, I think? For example,
|
Sure, if the columns are used elsewhere in the plan, they can't be removed. I was thinking of the case when they aren't used anywhere else However, I am not sure how often that happens in the real world Maybe a range join 🤔 SELECT ...
FROM stocks JOIN splits ON (
stocks.symbol = splits.symbol AND
stocks.ts < splits.max_time AND stocks.ts > splits.min_time
) But I suppose the expressions are't equijoin predicates anyways 🤔 |
So in Spark the physical query plan doesn't have additional Projection under Join operator for the join key expressions, in generally we would like to keep it as it without changing the query plan structure to avoid introducing unknown issues or performance regressions. As this PR relaxes the join key constraints for DataFusion Join operators, we don't need to add such Projection during translating Spark query plan in Comet. |
Yeah, but presumably you will have to update the Join operators in DataFusion to take Expressions (rather than |
I think this is already done by this PR. The Join operators in DataFusion actually already evaluate join key expressions (previously they are |
I review this PR and read comment above, I think use Some databases materialize |
Which issue does this PR close?
Closes #.
Rationale for this change
Currently the join keys of join operators like
SortMergeJoin
are restricted to beColumn
. But it is commonly we use expressions (e.g.,l_col + 1 = r_col + 2
) other than simply columns as join keys. From the query plan, DataFusion seems to add additionalProject
under join operator which projects the expressions into columns. So the above join operators take join keys as columns.However, in other query engines, e.g., Spark, its query plan doesn't have the additional projection but its join operators directly take general expressions as join keys. (note that by adding additional projection before join in Spark it means more data to be shuffled/sorted which can be bad for performance)
That means if we cannot delegate such join operators to DataFusion physical join operators which require join keys must be columns.
This patch tries to relax this join keys constraint of physical join operators. So we can construct DataFusion physical join operator using general expressions as join keys.
This patch doesn't change how DataFusion plans the join operators. I.e., DataFusion still plans a join operation using non-column join keys into projection + join operator with columns. (We probably can remove this additional projection later if it also adds additional cost to DataFusion. Currently I'm not sure if/how DataFusion plans partitioning for the join operators.)
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?