Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13244][SQL] Migrates DataFrame to Dataset #11443

Closed
wants to merge 37 commits into from

Conversation

liancheng
Copy link
Contributor

@liancheng liancheng commented Mar 1, 2016

What changes were proposed in this pull request?

This PR unifies DataFrame and Dataset by migrating existing DataFrame operations to Dataset and make DataFrame a type alias of Dataset[Row].

Most Scala code changes are source compatible, but Java API is broken as Java knows nothing about Scala type alias (mostly replacing DataFrame with Dataset<Row>).

There are several noticeable API changes related to those returning arrays:

  1. collect/take

    • Old APIs in class DataFrame:

      def collect(): Array[Row]
      def take(n: Int): Array[Row]
    • New APIs in class Dataset[T]:

      def collect(): Array[T]
      def take(n: Int): Array[T]
      
      def collectRows(): Array[Row]
      def takeRows(n: Int): Array[Row]

    Two specialized methods collectRows and takeRows are added because Java doesn't support returning generic arrays. Thus, for example, DataFrame.collect(): Array[T] actually returns Object instead of Array<T> from Java side.

    Normally, Java users may fall back to collectAsList and takeAsList. The two new specialized versions are added to avoid performance regression in ML related code (but maybe I'm wrong and they are not necessary here).

  2. randomSplit

    • Old APIs in class DataFrame:

      def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame]
      def randomSplit(weights: Array[Double]): Array[DataFrame]
    • New APIs in class Dataset[T]:

      def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
      def randomSplit(weights: Array[Double]): Array[Dataset[T]]

    Similar problem as above, but hasn't been addressed for Java API yet. We can probably add randomSplitAsList to fix this one.

  3. groupBy

    Some original DataFrame.groupBy methods have conflicting signature with original Dataset.groupBy methods. To distinguish these two, typed Dataset.groupBy methods are renamed to groupByKey.

Other noticeable changes:

  1. Dataset always do eager analysis now

    We used to support disabling DataFrame eager analysis to help reporting partially analyzed malformed logical plan on analysis failure. However, Dataset encoders requires eager analysi during Dataset construction. To preserve the error reporting feature, AnalysisException now takes an extra Option[LogicalPlan] argument to hold the partially analyzed plan, so that we can check the plan tree when reporting test failures. This plan is passed by QueryExecution.assertAnalyzed.

How was this patch tested?

Existing tests do the work.

TODO

  • Fix all tests
  • Re-enable MiMA check
  • Update ScalaDoc (@since, @group, and example code)

@SparkQA
Copy link

SparkQA commented Mar 1, 2016

Test build #52234 has finished for PR 11443 at commit 7fa44d7.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

cc @marmbrus @yhuai @rxin @cloud-fan

@SparkQA
Copy link

SparkQA commented Mar 1, 2016

Test build #52235 has finished for PR 11443 at commit cbb0852.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

Compilation failure due to incompatible newly merged Spark ML changes. Rebased to fix it.

@SparkQA
Copy link

SparkQA commented Mar 1, 2016

Test build #52236 has finished for PR 11443 at commit 716bc21.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

Temporarily disabled MiMA check for convenience since this PR is only for prototyping and it breaks compatibility of tons of API.

@SparkQA
Copy link

SparkQA commented Mar 1, 2016

Test build #52240 has finished for PR 11443 at commit 4080068.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2016

Test build #52243 has finished for PR 11443 at commit bb8373d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Mar 3, 2016

Do we really need to rename all the DataFrame references in Scala to Dataset[Row]? If we just introduce the type alias, those renaming wouldn't be necessary would it?

@liancheng
Copy link
Contributor Author

@rxin We don't. As what have been done in this PR, we only replaces DataFrame with Dataset<Row> on Java side.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52387 has finished for PR 11443 at commit 69695fb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52390 has finished for PR 11443 at commit 2bc521a.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52396 has finished for PR 11443 at commit d132062.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 5, 2016

Test build #52518 has finished for PR 11443 at commit 271d160.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 6, 2016

Test build #52529 has finished for PR 11443 at commit 34924ea.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 7, 2016

Test build #52559 has finished for PR 11443 at commit f2f3ea8.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

Finished all steps listed in the PR descriptions, trying to get all tests pass.

@SparkQA
Copy link

SparkQA commented Mar 7, 2016

Test build #52563 has finished for PR 11443 at commit d9ae0dd.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 7, 2016

Test build #52570 has finished for PR 11443 at commit 1ee0463.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 8, 2016

Test build #52642 has finished for PR 11443 at commit 4dc95bc.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 8, 2016

Test build #52653 has finished for PR 11443 at commit 46e3340.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CreateExternalRow(children: Seq[Expression], schema: StructType)

@SparkQA
Copy link

SparkQA commented Mar 8, 2016

Test build #52657 has finished for PR 11443 at commit c90bd96.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 8, 2016

Test build #52658 has finished for PR 11443 at commit 50be32b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 8, 2016

Test build #52656 has finished for PR 11443 at commit 42e7441.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Mar 10, 2016

test this please

1 similar comment
@yhuai
Copy link
Contributor

yhuai commented Mar 10, 2016

test this please

@yhuai
Copy link
Contributor

yhuai commented Mar 11, 2016

OK. Seems 91fed8e#diff-7a46f10c3cedbf013cf255564d9483cd is the cause of merge conflict. Let me just resolve it. Looks like we do not need to re-run the tests.

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@SparkQA
Copy link

SparkQA commented Mar 11, 2016

Test build #2630 has finished for PR 11443 at commit d52ce17.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 11, 2016

Test build #52862 has finished for PR 11443 at commit d52ce17.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Mar 11, 2016

I am going to merge this to master.

@yhuai
Copy link
Contributor

yhuai commented Mar 11, 2016

Let's keep an eye on the build. If there is any issue, let's fix that quickly.

@asfgit asfgit closed this in 1d54278 Mar 11, 2016
@SparkQA
Copy link

SparkQA commented Mar 11, 2016

Test build #52863 has finished for PR 11443 at commit d52ce17.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 11, 2016

Test build #52859 has finished for PR 11443 at commit d52ce17.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 11, 2016

Test build #52874 has finished for PR 11443 at commit 7d29c06.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Mar 11, 2016
## What changes were proposed in this pull request?

PR #11443 temporarily disabled MiMA check, this PR re-enables it.

One extra change is that `object DataFrame` is also removed. The only purpose of introducing `object DataFrame` was to use it as an internal factory for creating `Dataset[Row]`. By replacing this internal factory with `Dataset.newDataFrame`, both `DataFrame` and `DataFrame$` are entirely removed from the API, so that we can simply put a `MissingClassProblem` filter in `MimaExcludes.scala` for most DataFrame API  changes.

## How was this patch tested?

Tested by MiMA check triggered by Jenkins.

Author: Cheng Lian <[email protected]>

Closes #11656 from liancheng/re-enable-mima.
asfgit pushed a commit that referenced this pull request Mar 12, 2016
… from QueryExecution.assertAnalyzed

PR #11443 added an extra `plan: Option[LogicalPlan]` argument to `AnalysisException` and attached partially analyzed plan to thrown `AnalysisException` in `QueryExecution.assertAnalyzed()`.  However, the original stack trace wasn't properly inherited.  This PR fixes this issue by inheriting the stack trace.

A test case is added to verify that the first entry of `AnalysisException` stack trace isn't from `QueryExecution`.

Author: Cheng Lian <[email protected]>

Closes #11677 from liancheng/analysis-exception-stacktrace.
asfgit pushed a commit that referenced this pull request Mar 13, 2016
## What changes were proposed in this pull request?

This PR removes two methods, `collectRows()` and `takeRows()`, from `Dataset[T]`. These methods were added in PR #11443, and were later considered not useful.

## How was this patch tested?

Existing tests should do the work.

Author: Cheng Lian <[email protected]>

Closes #11678 from liancheng/remove-collect-rows-and-take-rows.
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Mar 17, 2016
… from QueryExecution.assertAnalyzed

PR apache#11443 added an extra `plan: Option[LogicalPlan]` argument to `AnalysisException` and attached partially analyzed plan to thrown `AnalysisException` in `QueryExecution.assertAnalyzed()`.  However, the original stack trace wasn't properly inherited.  This PR fixes this issue by inheriting the stack trace.

A test case is added to verify that the first entry of `AnalysisException` stack trace isn't from `QueryExecution`.

Author: Cheng Lian <[email protected]>

Closes apache#11677 from liancheng/analysis-exception-stacktrace.
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Mar 17, 2016
## What changes were proposed in this pull request?

This PR removes two methods, `collectRows()` and `takeRows()`, from `Dataset[T]`. These methods were added in PR apache#11443, and were later considered not useful.

## How was this patch tested?

Existing tests should do the work.

Author: Cheng Lian <[email protected]>

Closes apache#11678 from liancheng/remove-collect-rows-and-take-rows.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
## What changes were proposed in this pull request?

This PR unifies DataFrame and Dataset by migrating existing DataFrame operations to Dataset and make `DataFrame` a type alias of `Dataset[Row]`.

Most Scala code changes are source compatible, but Java API is broken as Java knows nothing about Scala type alias (mostly replacing `DataFrame` with `Dataset<Row>`).

There are several noticeable API changes related to those returning arrays:

1.  `collect`/`take`

    -   Old APIs in class `DataFrame`:

        ```scala
        def collect(): Array[Row]
        def take(n: Int): Array[Row]
        ```

    -   New APIs in class `Dataset[T]`:

        ```scala
        def collect(): Array[T]
        def take(n: Int): Array[T]

        def collectRows(): Array[Row]
        def takeRows(n: Int): Array[Row]
        ```

    Two specialized methods `collectRows` and `takeRows` are added because Java doesn't support returning generic arrays. Thus, for example, `DataFrame.collect(): Array[T]` actually returns `Object` instead of `Array<T>` from Java side.

    Normally, Java users may fall back to `collectAsList` and `takeAsList`.  The two new specialized versions are added to avoid performance regression in ML related code (but maybe I'm wrong and they are not necessary here).

1.  `randomSplit`

    -   Old APIs in class `DataFrame`:

        ```scala
        def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame]
        def randomSplit(weights: Array[Double]): Array[DataFrame]
        ```

    -   New APIs in class `Dataset[T]`:

        ```scala
        def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
        def randomSplit(weights: Array[Double]): Array[Dataset[T]]
        ```

    Similar problem as above, but hasn't been addressed for Java API yet.  We can probably add `randomSplitAsList` to fix this one.

1.  `groupBy`

    Some original `DataFrame.groupBy` methods have conflicting signature with original `Dataset.groupBy` methods.  To distinguish these two, typed `Dataset.groupBy` methods are renamed to `groupByKey`.

Other noticeable changes:

1.  Dataset always do eager analysis now

    We used to support disabling DataFrame eager analysis to help reporting partially analyzed malformed logical plan on analysis failure.  However, Dataset encoders requires eager analysi during Dataset construction.  To preserve the error reporting feature, `AnalysisException` now takes an extra `Option[LogicalPlan]` argument to hold the partially analyzed plan, so that we can check the plan tree when reporting test failures.  This plan is passed by `QueryExecution.assertAnalyzed`.

## How was this patch tested?

Existing tests do the work.

## TODO

- [ ] Fix all tests
- [ ] Re-enable MiMA check
- [ ] Update ScalaDoc (`since`, `group`, and example code)

Author: Cheng Lian <[email protected]>
Author: Yin Huai <[email protected]>
Author: Wenchen Fan <[email protected]>
Author: Cheng Lian <[email protected]>

Closes apache#11443 from liancheng/ds-to-df.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
## What changes were proposed in this pull request?

PR apache#11443 temporarily disabled MiMA check, this PR re-enables it.

One extra change is that `object DataFrame` is also removed. The only purpose of introducing `object DataFrame` was to use it as an internal factory for creating `Dataset[Row]`. By replacing this internal factory with `Dataset.newDataFrame`, both `DataFrame` and `DataFrame$` are entirely removed from the API, so that we can simply put a `MissingClassProblem` filter in `MimaExcludes.scala` for most DataFrame API  changes.

## How was this patch tested?

Tested by MiMA check triggered by Jenkins.

Author: Cheng Lian <[email protected]>

Closes apache#11656 from liancheng/re-enable-mima.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
… from QueryExecution.assertAnalyzed

PR apache#11443 added an extra `plan: Option[LogicalPlan]` argument to `AnalysisException` and attached partially analyzed plan to thrown `AnalysisException` in `QueryExecution.assertAnalyzed()`.  However, the original stack trace wasn't properly inherited.  This PR fixes this issue by inheriting the stack trace.

A test case is added to verify that the first entry of `AnalysisException` stack trace isn't from `QueryExecution`.

Author: Cheng Lian <[email protected]>

Closes apache#11677 from liancheng/analysis-exception-stacktrace.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
## What changes were proposed in this pull request?

This PR removes two methods, `collectRows()` and `takeRows()`, from `Dataset[T]`. These methods were added in PR apache#11443, and were later considered not useful.

## How was this patch tested?

Existing tests should do the work.

Author: Cheng Lian <[email protected]>

Closes apache#11678 from liancheng/remove-collect-rows-and-take-rows.
* }}}
* @since 1.6.0
*/
def select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is ever called. select(Column*) will always be preferred:

https://gist.github.com/vlad17/93f1cb57aad42eb7de33f92d6282a44f

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a simpler case, seems this works?

val ds = ....
ds.select($"a") // will call select(Column*)
ds.select($"a".as[Int]) // will call select[U1: Encoder](c1: TypedColumn[T, U1])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird, I tried your example, it does fail. Seems the TypedColumn returned by Aggregator.toColumn can't work here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vlad17 The reason why the snippet in your Gist fails is that (1 to 10).toDS is a Dataset[Int], while agg.toColumn is a TypedColumn[Long, Long]. Thus the select call is dispatched to the untyped one. The following one works:

scala> spark.range(10).as[Long].select(agg.toColumn).show()
+---------------+
|$anon$1(bigint)|
+---------------+
|             10|
+---------------+

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liancheng Yup, I suppose that's working as expected then. It's a bit confusing since aggregator has an implicitcastinputtypes mixin.

Perhaps it would be better for c1 to be TypedColumn[_, U1]?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants