-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13244][SQL] Migrates DataFrame to Dataset #11443
Conversation
Test build #52234 has finished for PR 11443 at commit
|
Test build #52235 has finished for PR 11443 at commit
|
Compilation failure due to incompatible newly merged Spark ML changes. Rebased to fix it. |
Test build #52236 has finished for PR 11443 at commit
|
Temporarily disabled MiMA check for convenience since this PR is only for prototyping and it breaks compatibility of tons of API. |
Test build #52240 has finished for PR 11443 at commit
|
Test build #52243 has finished for PR 11443 at commit
|
Do we really need to rename all the DataFrame references in Scala to Dataset[Row]? If we just introduce the type alias, those renaming wouldn't be necessary would it? |
@rxin We don't. As what have been done in this PR, we only replaces |
Test build #52387 has finished for PR 11443 at commit
|
Test build #52390 has finished for PR 11443 at commit
|
Test build #52396 has finished for PR 11443 at commit
|
Test build #52518 has finished for PR 11443 at commit
|
Test build #52529 has finished for PR 11443 at commit
|
Test build #52559 has finished for PR 11443 at commit
|
Finished all steps listed in the PR descriptions, trying to get all tests pass. |
Test build #52563 has finished for PR 11443 at commit
|
Test build #52570 has finished for PR 11443 at commit
|
Test build #52642 has finished for PR 11443 at commit
|
Test build #52653 has finished for PR 11443 at commit
|
Test build #52657 has finished for PR 11443 at commit
|
Test build #52658 has finished for PR 11443 at commit
|
Test build #52656 has finished for PR 11443 at commit
|
test this please |
1 similar comment
test this please |
OK. Seems 91fed8e#diff-7a46f10c3cedbf013cf255564d9483cd is the cause of merge conflict. Let me just resolve it. Looks like we do not need to re-run the tests. |
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Test build #2630 has finished for PR 11443 at commit
|
Test build #52862 has finished for PR 11443 at commit
|
I am going to merge this to master. |
Let's keep an eye on the build. If there is any issue, let's fix that quickly. |
Test build #52863 has finished for PR 11443 at commit
|
Test build #52859 has finished for PR 11443 at commit
|
Test build #52874 has finished for PR 11443 at commit
|
## What changes were proposed in this pull request? PR #11443 temporarily disabled MiMA check, this PR re-enables it. One extra change is that `object DataFrame` is also removed. The only purpose of introducing `object DataFrame` was to use it as an internal factory for creating `Dataset[Row]`. By replacing this internal factory with `Dataset.newDataFrame`, both `DataFrame` and `DataFrame$` are entirely removed from the API, so that we can simply put a `MissingClassProblem` filter in `MimaExcludes.scala` for most DataFrame API changes. ## How was this patch tested? Tested by MiMA check triggered by Jenkins. Author: Cheng Lian <[email protected]> Closes #11656 from liancheng/re-enable-mima.
… from QueryExecution.assertAnalyzed PR #11443 added an extra `plan: Option[LogicalPlan]` argument to `AnalysisException` and attached partially analyzed plan to thrown `AnalysisException` in `QueryExecution.assertAnalyzed()`. However, the original stack trace wasn't properly inherited. This PR fixes this issue by inheriting the stack trace. A test case is added to verify that the first entry of `AnalysisException` stack trace isn't from `QueryExecution`. Author: Cheng Lian <[email protected]> Closes #11677 from liancheng/analysis-exception-stacktrace.
## What changes were proposed in this pull request? This PR removes two methods, `collectRows()` and `takeRows()`, from `Dataset[T]`. These methods were added in PR #11443, and were later considered not useful. ## How was this patch tested? Existing tests should do the work. Author: Cheng Lian <[email protected]> Closes #11678 from liancheng/remove-collect-rows-and-take-rows.
… from QueryExecution.assertAnalyzed PR apache#11443 added an extra `plan: Option[LogicalPlan]` argument to `AnalysisException` and attached partially analyzed plan to thrown `AnalysisException` in `QueryExecution.assertAnalyzed()`. However, the original stack trace wasn't properly inherited. This PR fixes this issue by inheriting the stack trace. A test case is added to verify that the first entry of `AnalysisException` stack trace isn't from `QueryExecution`. Author: Cheng Lian <[email protected]> Closes apache#11677 from liancheng/analysis-exception-stacktrace.
## What changes were proposed in this pull request? This PR removes two methods, `collectRows()` and `takeRows()`, from `Dataset[T]`. These methods were added in PR apache#11443, and were later considered not useful. ## How was this patch tested? Existing tests should do the work. Author: Cheng Lian <[email protected]> Closes apache#11678 from liancheng/remove-collect-rows-and-take-rows.
## What changes were proposed in this pull request? This PR unifies DataFrame and Dataset by migrating existing DataFrame operations to Dataset and make `DataFrame` a type alias of `Dataset[Row]`. Most Scala code changes are source compatible, but Java API is broken as Java knows nothing about Scala type alias (mostly replacing `DataFrame` with `Dataset<Row>`). There are several noticeable API changes related to those returning arrays: 1. `collect`/`take` - Old APIs in class `DataFrame`: ```scala def collect(): Array[Row] def take(n: Int): Array[Row] ``` - New APIs in class `Dataset[T]`: ```scala def collect(): Array[T] def take(n: Int): Array[T] def collectRows(): Array[Row] def takeRows(n: Int): Array[Row] ``` Two specialized methods `collectRows` and `takeRows` are added because Java doesn't support returning generic arrays. Thus, for example, `DataFrame.collect(): Array[T]` actually returns `Object` instead of `Array<T>` from Java side. Normally, Java users may fall back to `collectAsList` and `takeAsList`. The two new specialized versions are added to avoid performance regression in ML related code (but maybe I'm wrong and they are not necessary here). 1. `randomSplit` - Old APIs in class `DataFrame`: ```scala def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame] def randomSplit(weights: Array[Double]): Array[DataFrame] ``` - New APIs in class `Dataset[T]`: ```scala def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] def randomSplit(weights: Array[Double]): Array[Dataset[T]] ``` Similar problem as above, but hasn't been addressed for Java API yet. We can probably add `randomSplitAsList` to fix this one. 1. `groupBy` Some original `DataFrame.groupBy` methods have conflicting signature with original `Dataset.groupBy` methods. To distinguish these two, typed `Dataset.groupBy` methods are renamed to `groupByKey`. Other noticeable changes: 1. Dataset always do eager analysis now We used to support disabling DataFrame eager analysis to help reporting partially analyzed malformed logical plan on analysis failure. However, Dataset encoders requires eager analysi during Dataset construction. To preserve the error reporting feature, `AnalysisException` now takes an extra `Option[LogicalPlan]` argument to hold the partially analyzed plan, so that we can check the plan tree when reporting test failures. This plan is passed by `QueryExecution.assertAnalyzed`. ## How was this patch tested? Existing tests do the work. ## TODO - [ ] Fix all tests - [ ] Re-enable MiMA check - [ ] Update ScalaDoc (`since`, `group`, and example code) Author: Cheng Lian <[email protected]> Author: Yin Huai <[email protected]> Author: Wenchen Fan <[email protected]> Author: Cheng Lian <[email protected]> Closes apache#11443 from liancheng/ds-to-df.
## What changes were proposed in this pull request? PR apache#11443 temporarily disabled MiMA check, this PR re-enables it. One extra change is that `object DataFrame` is also removed. The only purpose of introducing `object DataFrame` was to use it as an internal factory for creating `Dataset[Row]`. By replacing this internal factory with `Dataset.newDataFrame`, both `DataFrame` and `DataFrame$` are entirely removed from the API, so that we can simply put a `MissingClassProblem` filter in `MimaExcludes.scala` for most DataFrame API changes. ## How was this patch tested? Tested by MiMA check triggered by Jenkins. Author: Cheng Lian <[email protected]> Closes apache#11656 from liancheng/re-enable-mima.
… from QueryExecution.assertAnalyzed PR apache#11443 added an extra `plan: Option[LogicalPlan]` argument to `AnalysisException` and attached partially analyzed plan to thrown `AnalysisException` in `QueryExecution.assertAnalyzed()`. However, the original stack trace wasn't properly inherited. This PR fixes this issue by inheriting the stack trace. A test case is added to verify that the first entry of `AnalysisException` stack trace isn't from `QueryExecution`. Author: Cheng Lian <[email protected]> Closes apache#11677 from liancheng/analysis-exception-stacktrace.
## What changes were proposed in this pull request? This PR removes two methods, `collectRows()` and `takeRows()`, from `Dataset[T]`. These methods were added in PR apache#11443, and were later considered not useful. ## How was this patch tested? Existing tests should do the work. Author: Cheng Lian <[email protected]> Closes apache#11678 from liancheng/remove-collect-rows-and-take-rows.
* }}} | ||
* @since 1.6.0 | ||
*/ | ||
def select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is ever called. select(Column*)
will always be preferred:
https://gist.github.com/vlad17/93f1cb57aad42eb7de33f92d6282a44f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried a simpler case, seems this works?
val ds = ....
ds.select($"a") // will call select(Column*)
ds.select($"a".as[Int]) // will call select[U1: Encoder](c1: TypedColumn[T, U1])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird, I tried your example, it does fail. Seems the TypedColumn
returned by Aggregator.toColumn
can't work here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vlad17 The reason why the snippet in your Gist fails is that (1 to 10).toDS
is a Dataset[Int]
, while agg.toColumn
is a TypedColumn[Long, Long]
. Thus the select
call is dispatched to the untyped one. The following one works:
scala> spark.range(10).as[Long].select(agg.toColumn).show()
+---------------+
|$anon$1(bigint)|
+---------------+
| 10|
+---------------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liancheng Yup, I suppose that's working as expected then. It's a bit confusing since aggregator has an implicitcastinputtypes mixin.
Perhaps it would be better for c1 to be TypedColumn[_, U1]?
What changes were proposed in this pull request?
This PR unifies DataFrame and Dataset by migrating existing DataFrame operations to Dataset and make
DataFrame
a type alias ofDataset[Row]
.Most Scala code changes are source compatible, but Java API is broken as Java knows nothing about Scala type alias (mostly replacing
DataFrame
withDataset<Row>
).There are several noticeable API changes related to those returning arrays:
collect
/take
Old APIs in class
DataFrame
:New APIs in class
Dataset[T]
:Two specialized methods
collectRows
andtakeRows
are added because Java doesn't support returning generic arrays. Thus, for example,DataFrame.collect(): Array[T]
actually returnsObject
instead ofArray<T>
from Java side.Normally, Java users may fall back to
collectAsList
andtakeAsList
. The two new specialized versions are added to avoid performance regression in ML related code (but maybe I'm wrong and they are not necessary here).randomSplit
Old APIs in class
DataFrame
:New APIs in class
Dataset[T]
:Similar problem as above, but hasn't been addressed for Java API yet. We can probably add
randomSplitAsList
to fix this one.groupBy
Some original
DataFrame.groupBy
methods have conflicting signature with originalDataset.groupBy
methods. To distinguish these two, typedDataset.groupBy
methods are renamed togroupByKey
.Other noticeable changes:
Dataset always do eager analysis now
We used to support disabling DataFrame eager analysis to help reporting partially analyzed malformed logical plan on analysis failure. However, Dataset encoders requires eager analysi during Dataset construction. To preserve the error reporting feature,
AnalysisException
now takes an extraOption[LogicalPlan]
argument to hold the partially analyzed plan, so that we can check the plan tree when reporting test failures. This plan is passed byQueryExecution.assertAnalyzed
.How was this patch tested?
Existing tests do the work.
TODO
@since
,@group
, and example code)