From 23ec09fc3bbedd2f34c594daf461cebd9c0295a6 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Thu, 19 Jul 2018 16:38:44 -0700 Subject: [PATCH 1/4] fix --- .../apache/spark/sql/DataFrameWriter.scala | 10 +++-- .../scala/org/apache/spark/sql/UDFSuite.scala | 45 ++++++++++++++++++- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 90bea2d676e22..99493fc6c0a1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -254,7 +254,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options) if (writer.isPresent) { runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get(), df.logicalPlan) + WriteToDataSourceV2(writer.get(), df.planWithBarrier) } } @@ -275,7 +275,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { sparkSession = df.sparkSession, className = source, partitionColumns = partitioningColumns.getOrElse(Nil), - options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) + options = extraOptions.toMap).planForWriting(mode, df.planWithBarrier) } } @@ -323,7 +323,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { InsertIntoTable( table = UnresolvedRelation(tableIdent), partition = Map.empty[String, Option[String]], - query = df.logicalPlan, + query = df.planWithBarrier, overwrite = mode == SaveMode.Overwrite, ifPartitionNotExists = false) } @@ -459,7 +459,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { partitionColumnNames = partitioningColumns.getOrElse(Nil), bucketSpec = getBucketSpec) - runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan))) + runCommand(df.sparkSession, "saveAsTable") { + CreateTable(tableDesc, mode, Some(df.planWithBarrier)) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 21afdc7e2a33f..5fd5e1c2be5b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -19,11 +19,17 @@ package org.apache.spark.sql import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.plans.logical.Project -import org.apache.spark.sql.execution.command.ExplainCommand -import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, ExplainCommand} +import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand +import org.apache.spark.sql.functions.{lit, udf} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types.{DataTypes, DoubleType} +import org.apache.spark.sql.util.QueryExecutionListener + +import scala.collection.mutable.ArrayBuffer private case class FunctionResult(f1: String, f2: String) @@ -324,4 +330,39 @@ class UDFSuite extends QueryTest with SharedSQLContext { assert(outputStream.toString.contains("UDF:f(a._1 AS `_1`)")) } } + + test("cached Data should be used in the write path") { + withTable("t") { + withTempPath { path => + var numTotalCachedHit = 0 + val listener = new QueryExecutionListener { + override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {} + + override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { + qe.withCachedData match { + case c: CreateDataSourceTableAsSelectCommand + if c.query.isInstanceOf[InMemoryRelation] => + numTotalCachedHit += 1 + case i: InsertIntoHadoopFsRelationCommand + if i.query.isInstanceOf[InMemoryRelation] => + numTotalCachedHit += 1 + case _ => + } + } + } + spark.listenerManager.register(listener) + + val udf1 = udf({ (x: Int, y: Int) => x + y }) + val df = spark.range(0, 3).toDF("a") + .withColumn("b", udf1($"a", lit(10))) + df.cache() + df.write.saveAsTable("t") + assert(numTotalCachedHit == 1, "expected to be cached in saveAsTable") + df.write.insertInto("t") + assert(numTotalCachedHit == 2, "expected to be cached in insertInto") + df.write.save(path.getCanonicalPath) + assert(numTotalCachedHit == 3, "expected to be cached in save for native") + } + } + } } From 4030e174e6efeeae6158f13675234ed994276371 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Thu, 19 Jul 2018 17:54:53 -0700 Subject: [PATCH 2/4] remove useless import --- sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 5fd5e1c2be5b4..4332ee1396dca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types.{DataTypes, DoubleType} import org.apache.spark.sql.util.QueryExecutionListener -import scala.collection.mutable.ArrayBuffer private case class FunctionResult(f1: String, f2: String) From e8bf33c8ba3c88278ccff24fc5b76677996cd1bd Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Sun, 22 Jul 2018 10:35:58 -0700 Subject: [PATCH 3/4] fix --- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 04bf8c6dd917f..c7f7e4d755cfd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} +import org.apache.spark.sql.catalyst.analysis.{EliminateBarriers, NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -891,8 +891,9 @@ object DDLUtils { * Throws exception if outputPath tries to overwrite inputpath. */ def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = { - val inputPaths = query.collect { - case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths + val inputPaths = EliminateBarriers(query).collect { + case LogicalRelation(r: HadoopFsRelation, _, _, _) => + r.location.rootPaths }.flatten if (inputPaths.contains(outputPath)) { From ddbd9f7c796e8bedfbae3141c9c7098370c217ce Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Wed, 25 Jul 2018 11:00:12 -0700 Subject: [PATCH 4/4] fix build failure. --- sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 2e1d2eca67c08..30dca9497ddde 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -19,14 +19,10 @@ package org.apache.spark.sql import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.plans.logical.Project -<<<<<<< HEAD import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, ExplainCommand} import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand -======= -import org.apache.spark.sql.execution.command.ExplainCommand ->>>>>>> upstream/master import org.apache.spark.sql.functions.{lit, udf} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._