-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places #7408
Conversation
…Ordering everywhere This patch renames RowOrdering to InterpretedOrdering and updates a few operators to use SparkPlan.newOrdering instead of manually constructing a RowOrdering.
@@ -158,7 +158,7 @@ case class Exchange( | |||
val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { | |||
child.execute().mapPartitions { iter => | |||
val hashExpressions = newMutableProjection(expressions, child.output)() | |||
iter.map(r => (hashExpressions(r).copy(), r.copy())) | |||
iter.map(r => (hashExpressions(r).copy, r.copy())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm since copy is a verb, it is better to keep the () i think. also we defined copy with ()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't mean to make this change in this PR, will back out this one line.
LGTM other than that nit. |
LGTM |
Ah, looks like I may have to move some calls so that we don't try to serialize the generated ordering:
|
Aha, I see the problem: we're trying to serialize the GeneratedOrdering when passing it into the RangePartitioner. I'll see if I can come up with a workaround for this. |
I came up with a workaround which allows GeneratedOrdering to be used as the ordering when computing bounds in the RangePartitioner. Using a generated ordering might not be a huge win here, so I'd be okay with backing out of this change if someone thinks I should. |
Test build #37298 timed out for PR 7408 at commit |
Test build #37311 has finished for PR 7408 at commit
|
Ah, looks like we have the same problem in |
Alright, I've updated this to add a similar wrapper in |
Test build #37672 has finished for PR 7408 at commit
|
@JoshRosen how about make the generated Ordering serializable? We can make the generated class implements |
@cloud-fan I think this should not work, because the generated class only exists in the JVM that generate it. |
Or we can always return an wrapper for generated ordering, not sure what's the overhead of the wrapper. |
I'm going to close this for now; this doesn't seem like a big win and is actually kind of tricky to implement. |
This pull request enables Unsafe mode by default in Spark SQL. In order to do this, we had to fix a number of small issues: **List of fixed blockers**: - [x] Make some default buffer sizes configurable so that HiveCompatibilitySuite can run properly (#7741). - [x] Memory leak on grouped aggregation of empty input (fixed by #7560 to fix this) - [x] Update planner to also check whether codegen is enabled before planning unsafe operators. - [x] Investigate failing HiveThriftBinaryServerSuite test. This turns out to be caused by a ClassCastException that occurs when Exchange tries to apply an interpreted RowOrdering to an UnsafeRow when range partitioning an RDD. This could be fixed by #7408, but a shorter-term fix is to just skip the Unsafe exchange path when RangePartitioner is used. - [x] Memory leak exceptions masking exceptions that actually caused tasks to fail (will be fixed by #7603). - [x] ~~https://issues.apache.org/jira/browse/SPARK-9162, to implement code generation for ScalaUDF. This is necessary for `UDFSuite` to pass. For now, I've just ignored this test in order to try to find other problems while we wait for a fix.~~ This is no longer necessary as of #7682. - [x] Memory leaks from Limit after UnsafeExternalSort cause the memory leak detector to fail tests. This is a huge problem in the HiveCompatibilitySuite (fixed by f4ac642a4e5b2a7931c5e04e086bb10e263b1db6). - [x] Tests in `AggregationQuerySuite` are failing due to NaN-handling issues in UnsafeRow, which were fixed in #7736. - [x] `org.apache.spark.sql.ColumnExpressionSuite.rand` needs to be updated so that the planner check also matches `TungstenProject`. - [x] After having lowered the buffer sizes to 4MB so that most of HiveCompatibilitySuite runs: - [x] Wrong answer in `join_1to1` (fixed by #7680) - [x] Wrong answer in `join_nulls` (fixed by #7680) - [x] Managed memory OOM / leak in `lateral_view` - [x] Seems to hang indefinitely in `partcols1`. This might be a deadlock in script transformation or a bug in error-handling code? The hang was fixed by #7710. - [x] Error while freeing memory in `partcols1`: will be fixed by #7734. - [x] After fixing the `partcols1` hang, it appears that a number of later tests have issues as well. - [x] Fix thread-safety bug in codegen fallback expression evaluation (#7759). Author: Josh Rosen <[email protected]> Closes #7564 from JoshRosen/unsafe-by-default and squashes the following commits: 83c0c56 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default f4cc859 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default 963f567 [Josh Rosen] Reduce buffer size for R tests d6986de [Josh Rosen] Lower page size in PySpark tests 013b9da [Josh Rosen] Also match TungstenProject in checkNumProjects 5d0b2d3 [Josh Rosen] Add task completion callback to avoid leak in limit after sort ea250da [Josh Rosen] Disable unsafe Exchange path when RangePartitioning is used 715517b [Josh Rosen] Enable Unsafe by default
I'm going to revive this since I've found a few problems caused by our use of |
This is so conflicted that I'm just going to start over and will force-push; for posterity, the old version's HEAD is 15b7d92 |
Hmm, GitHub won't let me re-open after a force-push. I'll make a new PR. |
…Ordering in SMJ This patches renames `RowOrdering` to `InterpretedOrdering` and updates SortMergeJoin to use the `SparkPlan` methods for constructing its ordering so that it may benefit from codegen. This is an updated version of #7408. Author: Josh Rosen <[email protected]> Closes #7973 from JoshRosen/SPARK-9054 and squashes the following commits: e610655 [Josh Rosen] Add comment RE: Ascending ordering 34b8e0c [Josh Rosen] Import ordering be19a0f [Josh Rosen] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places. (cherry picked from commit 9c87892) Signed-off-by: Josh Rosen <[email protected]>
…Ordering in SMJ This patches renames `RowOrdering` to `InterpretedOrdering` and updates SortMergeJoin to use the `SparkPlan` methods for constructing its ordering so that it may benefit from codegen. This is an updated version of #7408. Author: Josh Rosen <[email protected]> Closes #7973 from JoshRosen/SPARK-9054 and squashes the following commits: e610655 [Josh Rosen] Add comment RE: Ascending ordering 34b8e0c [Josh Rosen] Import ordering be19a0f [Josh Rosen] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places.
This patch renames RowOrdering to InterpretedOrdering and updates a few operators to use SparkPlan.newOrdering instead of manually constructing a RowOrdering.