Skip to content

Commit

Permalink
[SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
```Scala
      val udf1 = udf({(x: Int, y: Int) => x + y})
      val df = spark.range(0, 3).toDF("a")
        .withColumn("b", udf1($"a", udf1($"a", lit(10))))
      df.cache()
      df.write.saveAsTable("t")
```
Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent.

## How was this patch tested?
Added a test.

Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869

Author: Xiao Li <[email protected]>

Closes #21821 from gatorsmile/testMaster22.
  • Loading branch information
gatorsmile committed Jul 26, 2018
1 parent 17f469b commit d2e7deb
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get(), df.logicalPlan)
WriteToDataSourceV2(writer.get(), df.planWithBarrier)
}
}

Expand All @@ -275,7 +275,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
sparkSession = df.sparkSession,
className = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan))
options = extraOptions.toMap).planForWriting(mode, df.planWithBarrier)
}
}

Expand Down Expand Up @@ -323,7 +323,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
InsertIntoTable(
table = UnresolvedRelation(tableIdent),
partition = Map.empty[String, Option[String]],
query = df.logicalPlan,
query = df.planWithBarrier,
overwrite = mode == SaveMode.Overwrite,
ifPartitionNotExists = false)
}
Expand Down Expand Up @@ -459,7 +459,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec)

runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan)))
runCommand(df.sparkSession, "saveAsTable") {
CreateTable(tableDesc, mode, Some(df.planWithBarrier))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.analysis.{EliminateBarriers, NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand Down Expand Up @@ -891,8 +891,9 @@ object DDLUtils {
* Throws exception if outputPath tries to overwrite inputpath.
*/
def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = {
val inputPaths = query.collect {
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
val inputPaths = EliminateBarriers(query).collect {
case LogicalRelation(r: HadoopFsRelation, _, _, _) =>
r.location.rootPaths
}.flatten

if (inputPaths.contains(outputPath)) {
Expand Down
42 changes: 41 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ package org.apache.spark.sql

import org.apache.spark.sql.api.java._
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, ExplainCommand}
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.functions.{lit, udf}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.types.{DataTypes, DoubleType}
import org.apache.spark.sql.util.QueryExecutionListener


private case class FunctionResult(f1: String, f2: String)

Expand Down Expand Up @@ -325,6 +330,41 @@ class UDFSuite extends QueryTest with SharedSQLContext {
}
}

test("cached Data should be used in the write path") {
withTable("t") {
withTempPath { path =>
var numTotalCachedHit = 0
val listener = new QueryExecutionListener {
override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
qe.withCachedData match {
case c: CreateDataSourceTableAsSelectCommand
if c.query.isInstanceOf[InMemoryRelation] =>
numTotalCachedHit += 1
case i: InsertIntoHadoopFsRelationCommand
if i.query.isInstanceOf[InMemoryRelation] =>
numTotalCachedHit += 1
case _ =>
}
}
}
spark.listenerManager.register(listener)

val udf1 = udf({ (x: Int, y: Int) => x + y })
val df = spark.range(0, 3).toDF("a")
.withColumn("b", udf1($"a", lit(10)))
df.cache()
df.write.saveAsTable("t")
assert(numTotalCachedHit == 1, "expected to be cached in saveAsTable")
df.write.insertInto("t")
assert(numTotalCachedHit == 2, "expected to be cached in insertInto")
df.write.save(path.getCanonicalPath)
assert(numTotalCachedHit == 3, "expected to be cached in save for native")
}
}
}

test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
val udf1 = udf({(x: Int, y: Int) => x + y})
val df = spark.range(0, 3).toDF("a")
Expand Down

0 comments on commit d2e7deb

Please sign in to comment.