Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places #7408

Closed
wants to merge 8 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch renames RowOrdering to InterpretedOrdering and updates a few operators to use SparkPlan.newOrdering instead of manually constructing a RowOrdering.

…Ordering everywhere

This patch renames RowOrdering to InterpretedOrdering and updates a few operators to use
SparkPlan.newOrdering instead of manually constructing a RowOrdering.
@JoshRosen
Copy link
Contributor Author

/cc @yhuai @rxin

@@ -158,7 +158,7 @@ case class Exchange(
val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
iter.map(r => (hashExpressions(r).copy(), r.copy()))
iter.map(r => (hashExpressions(r).copy, r.copy()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm since copy is a verb, it is better to keep the () i think. also we defined copy with ()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean to make this change in this PR, will back out this one line.

@rxin
Copy link
Contributor

rxin commented Jul 15, 2015

LGTM other than that nit.

@yhuai
Copy link
Contributor

yhuai commented Jul 15, 2015

LGTM

@JoshRosen
Copy link
Contributor Author

Ah, looks like I may have to move some calls so that we don't try to serialize the generated ordering:

[info] - sort followed by limit *** FAILED *** (97 milliseconds)
[info]   Exception thrown while executing Spark plan to calculate expected answer:
[info]    Limit 10
[info]    !Sort ['a ASC], true
[info]     LocalTableScan [a#457], [[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100]]
[info]   
[info]    == Exception ==
[info]    org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: SC
[info]      - field (class "SC$SpecificOrdering", name: "this$0", type: "class SC")
[info]      - object (class "SC$SpecificOrdering", SC$SpecificOrdering@1edd238a)
[info]      - field (class "org.apache.spark.util.CollectionsUtils$$anonfun$makeBinarySearch$8", name: "comparator$1", type: "interface java.util.Comparator")
[info]      - object (class "org.apache.spark.util.CollectionsUtils$$anonfun$makeBinarySearch$8", <function2>)
[info]      - field (class "org.apache.spark.RangePartitioner", name: "org$apache$spark$RangePartitioner$$binarySearch", type: "interface scala.Function2")
[info]      - custom writeObject data (class "org.apache.spark.RangePartitioner")
[info]      - object (class "org.apache.spark.RangePartitioner", org.apache.spark.RangePartitioner@50cf16c6)
[info]      - field (class "org.apache.spark.ShuffleDependency", name: "partitioner", type: "class org.apache.spark.Partitioner")
[info]      - object (class "org.apache.spark.ShuffleDependency", org.apache.spark.ShuffleDependency@5d3944b0)
[info]      - field (class "scala.Tuple2", name: "_2", type: "class java.lang.Object")
[info]      - root object (class "scala.Tuple2", (MapPartitionsRDD[424] at apply at Transformer.scala:22,org.apache.spark.ShuffleDependency@5d3944b0))
[info]    org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: SC
[info]      - field (class "SC$SpecificOrdering", name: "this$0", type: "class SC")
[info]      - object (class "SC$SpecificOrdering", SC$SpecificOrdering@1edd238a)
[info]      - field (class "org.apache.spark.util.CollectionsUtils$$anonfun$makeBinarySearch$8", name: "comparator$1", type: "interface java.util.Comparator")

@JoshRosen
Copy link
Contributor Author

Aha, I see the problem: we're trying to serialize the GeneratedOrdering when passing it into the RangePartitioner. I'll see if I can come up with a workaround for this.

@JoshRosen
Copy link
Contributor Author

I came up with a workaround which allows GeneratedOrdering to be used as the ordering when computing bounds in the RangePartitioner. Using a generated ordering might not be a huge win here, so I'd be okay with backing out of this change if someone thinks I should.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37298 timed out for PR 7408 at commit 5eb7222 after a configured wait of 175m.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37311 has finished for PR 7408 at commit 8060594.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Ah, looks like we have the same problem in TakeOrderedAndProject: we have to pass the generated ordering into Spark Core code which accepts the ordering on the driver rather than a closure to create the ordering.

@JoshRosen
Copy link
Contributor Author

Alright, I've updated this to add a similar wrapper in TakeOderedAndProject. PTAL and let me know if this looks okay.

@SparkQA
Copy link

SparkQA commented Jul 17, 2015

Test build #37672 has finished for PR 7408 at commit 15b7d92.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class NaiveBayes(override val uid: String)
    • class PCA(JavaEstimator, HasInputCol, HasOutputCol):
    • class PCAModel(JavaModel):

@cloud-fan
Copy link
Contributor

@JoshRosen how about make the generated Ordering serializable? We can make the generated class implements Serializable by evaluator.setImplementedInterfaces(Array(classOf[Serializable])) at here.
cc @davies , does this work?

@davies
Copy link
Contributor

davies commented Jul 18, 2015

@cloud-fan I think this should not work, because the generated class only exists in the JVM that generate it.

@davies
Copy link
Contributor

davies commented Jul 19, 2015

Or we can always return an wrapper for generated ordering, not sure what's the overhead of the wrapper.

@JoshRosen
Copy link
Contributor Author

I'm going to close this for now; this doesn't seem like a big win and is actually kind of tricky to implement.

@JoshRosen JoshRosen closed this Jul 21, 2015
asfgit pushed a commit that referenced this pull request Jul 30, 2015
This pull request enables Unsafe mode by default in Spark SQL. In order to do this, we had to fix a number of small issues:

**List of fixed blockers**:

- [x] Make some default buffer sizes configurable so that HiveCompatibilitySuite can run properly (#7741).
- [x] Memory leak on grouped aggregation of empty input (fixed by #7560 to fix this)
- [x] Update planner to also check whether codegen is enabled before planning unsafe operators.
- [x] Investigate failing HiveThriftBinaryServerSuite test.  This turns out to be caused by a ClassCastException that occurs when Exchange tries to apply an interpreted RowOrdering to an UnsafeRow when range partitioning an RDD.  This could be fixed by #7408, but a shorter-term fix is to just skip the Unsafe exchange path when RangePartitioner is used.
- [x] Memory leak exceptions masking exceptions that actually caused tasks to fail (will be fixed by #7603).
- [x]  ~~https://issues.apache.org/jira/browse/SPARK-9162, to implement code generation for ScalaUDF.  This is necessary for `UDFSuite` to pass.  For now, I've just ignored this test in order to try to find other problems while we wait for a fix.~~ This is no longer necessary as of #7682.
- [x] Memory leaks from Limit after UnsafeExternalSort cause the memory leak detector to fail tests. This is a huge problem in the HiveCompatibilitySuite (fixed by f4ac642a4e5b2a7931c5e04e086bb10e263b1db6).
- [x] Tests in `AggregationQuerySuite` are failing due to NaN-handling issues in UnsafeRow, which were fixed in #7736.
- [x] `org.apache.spark.sql.ColumnExpressionSuite.rand` needs to be updated so that the planner check also matches `TungstenProject`.
- [x] After having lowered the buffer sizes to 4MB so that most of HiveCompatibilitySuite runs:
  - [x] Wrong answer in `join_1to1` (fixed by #7680)
  - [x] Wrong answer in `join_nulls` (fixed by #7680)
  - [x] Managed memory OOM / leak in `lateral_view`
  - [x] Seems to hang indefinitely in `partcols1`.  This might be a deadlock in script transformation or a bug in error-handling code? The hang was fixed by #7710.
  - [x] Error while freeing memory in `partcols1`: will be fixed by #7734.
- [x] After fixing the `partcols1` hang, it appears that a number of later tests have issues as well.
- [x] Fix thread-safety bug in codegen fallback expression evaluation (#7759).

Author: Josh Rosen <[email protected]>

Closes #7564 from JoshRosen/unsafe-by-default and squashes the following commits:

83c0c56 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default
f4cc859 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-by-default
963f567 [Josh Rosen] Reduce buffer size for R tests
d6986de [Josh Rosen] Lower page size in PySpark tests
013b9da [Josh Rosen] Also match TungstenProject in checkNumProjects
5d0b2d3 [Josh Rosen] Add task completion callback to avoid leak in limit after sort
ea250da [Josh Rosen] Disable unsafe Exchange path when RangePartitioning is used
715517b [Josh Rosen] Enable Unsafe by default
@JoshRosen JoshRosen changed the title [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering everywhere [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places Aug 5, 2015
@JoshRosen
Copy link
Contributor Author

I'm going to revive this since I've found a few problems caused by our use of RowOrdering in a few operators.

@JoshRosen
Copy link
Contributor Author

This is so conflicted that I'm just going to start over and will force-push; for posterity, the old version's HEAD is 15b7d92

@JoshRosen
Copy link
Contributor Author

Hmm, GitHub won't let me re-open after a force-push. I'll make a new PR.

asfgit pushed a commit that referenced this pull request Aug 5, 2015
…Ordering in SMJ

This patches renames `RowOrdering` to `InterpretedOrdering` and updates SortMergeJoin to use the `SparkPlan` methods for constructing its ordering so that it may benefit from codegen.

This is an updated version of #7408.

Author: Josh Rosen <[email protected]>

Closes #7973 from JoshRosen/SPARK-9054 and squashes the following commits:

e610655 [Josh Rosen] Add comment RE: Ascending ordering
34b8e0c [Josh Rosen] Import ordering
be19a0f [Josh Rosen] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places.

(cherry picked from commit 9c87892)
Signed-off-by: Josh Rosen <[email protected]>
asfgit pushed a commit that referenced this pull request Aug 5, 2015
…Ordering in SMJ

This patches renames `RowOrdering` to `InterpretedOrdering` and updates SortMergeJoin to use the `SparkPlan` methods for constructing its ordering so that it may benefit from codegen.

This is an updated version of #7408.

Author: Josh Rosen <[email protected]>

Closes #7973 from JoshRosen/SPARK-9054 and squashes the following commits:

e610655 [Josh Rosen] Add comment RE: Ascending ordering
34b8e0c [Josh Rosen] Import ordering
be19a0f [Josh Rosen] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants