Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWriter #21821

Closed
wants to merge 5 commits into from

Conversation

gatorsmile
Copy link
Member

What changes were proposed in this pull request?

      val udf1 = udf({(x: Int, y: Int) => x + y})
      val df = spark.range(0, 3).toDF("a")
        .withColumn("b", udf1($"a", udf1($"a", lit(10))))
      df.cache()
      df.write.saveAsTable("t")

Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent.

How was this patch tested?

Added a test.

Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869

@@ -254,7 +254,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get(), df.logicalPlan)
WriteToDataSourceV2(writer.get(), df.planWithBarrier)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not needed but it is safe to have.

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93305 has finished for PR 21821 at commit 23ec09f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93309 has finished for PR 21821 at commit 4030e17.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

cc @cloud-fan @hvanhovell @rxin

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93319 has finished for PR 21821 at commit 9edc28f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93323 has finished for PR 21821 at commit 9edc28f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile gatorsmile changed the title [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWriter [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWriter [WIP] Jul 20, 2018
@gatorsmile
Copy link
Member Author

gatorsmile commented Jul 20, 2018

~~We need a separate rule to eliminate barriers for the write path and CTAS, since the input queries are not always children of these nodes. Thus, the current EliminateBarriers does not work. ~~

So far, all the relevant plans since 2.3 have extended DataWritingCommand. Thus, EliminateBarriers will still eliminate the barrier at the end of Analyzer.

@cloud-fan
Copy link
Contributor

shall we fix the non-idempotent analyzer rule for 2.3.2?

@@ -891,8 +891,9 @@ object DDLUtils {
* Throws exception if outputPath tries to overwrite inputpath.
*/
def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = {
val inputPaths = query.collect {
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
val inputPaths = EliminateBarriers(query).collect {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalysisBarrier is a leaf node. That is one of the reasons why it could easily break the other code.

@gatorsmile
Copy link
Member Author

@cloud-fan This sounds good to me. @maryannxue Please fix the rule HandleNullInputsForUDF?

@SparkQA
Copy link

SparkQA commented Jul 22, 2018

Test build #93416 has finished for PR 21821 at commit e8bf33c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maryannxue
Copy link
Contributor

Yes, @gatorsmile. Code is ready. Will post a PR shortly.

@gatorsmile gatorsmile changed the title [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWriter [WIP] [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWriter Jul 23, 2018
@gatorsmile
Copy link
Member Author

cc @hvanhovell

@hvanhovell
Copy link
Contributor

@gatorsmile do we still need this patch if maryann fixes this?

@gatorsmile
Copy link
Member Author

gatorsmile commented Jul 23, 2018

@hvanhovell The question is whether HandleNullInputsForUDF is the only rule that are non-idempotent. If not, we still need to add an AnalysisBarrier. It sounds like the changes are still safe to apply?

@maryannxue
Copy link
Contributor

LGTM.

@hvanhovell
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Jul 25, 2018

Test build #93553 has finished for PR 21821 at commit 328addd.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mgaido91
Copy link
Contributor

Is this still valid since #21822 is going on? Shall we have this only on 2.3 main branches?

@gatorsmile
Copy link
Member Author

@mgaido91 See the comment #21821 (comment)

@gatorsmile
Copy link
Member Author

gatorsmile commented Jul 25, 2018

This PR is majorly for Spark 2.3 branch.

The code changes will be removed from the master branch when #21822 is merged. However, the test cases are still valid.

@mgaido91
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Jul 25, 2018

Test build #93555 has finished for PR 21821 at commit ddbd9f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Jul 26, 2018
```Scala
      val udf1 = udf({(x: Int, y: Int) => x + y})
      val df = spark.range(0, 3).toDF("a")
        .withColumn("b", udf1($"a", udf1($"a", lit(10))))
      df.cache()
      df.write.saveAsTable("t")
```
Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent.

Added a test.

Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869

Author: Xiao Li <[email protected]>

Closes #21821 from gatorsmile/testMaster22.

(cherry picked from commit d2e7deb)
Signed-off-by: Xiao Li <[email protected]>
@asfgit asfgit closed this in d2e7deb Jul 26, 2018
@gatorsmile
Copy link
Member Author

Thanks! Merged to master/2.3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants