diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 84f3fd360b71b..1026899699082 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -185,6 +185,16 @@ object FileFormatWriter extends Logging { statsTrackers = statsTrackers ) + SQLExecution.checkSQLExecutionId(sparkSession) + + // propagate the description UUID into the jobs, so that committers + // get an ID guaranteed to be unique. + job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) + + // This call shouldn't be put into the `try` block below because it only initializes and + // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. + committer.setupJob(job) + // We should first sort by partition columns, then bucket id, and finally sorting columns. val requiredOrdering = partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns @@ -208,16 +218,6 @@ object FileFormatWriter extends Logging { } } - SQLExecution.checkSQLExecutionId(sparkSession) - - // propagate the description UUID into the jobs, so that committers - // get an ID guaranteed to be unique. - job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid) - - // This call shouldn't be put into the `try` block below because it only initializes and - // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - committer.setupJob(job) - try { val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { (materializedPlan.execute(), None) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index c933ab50d216d..addf6d2134c91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.PrimitiveType @@ -32,7 +32,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkContext, TestUtils} +import org.apache.spark.{SparkContext, SparkException, TestUtils} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} @@ -1275,4 +1275,26 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } } + + test("SPARK-43327: location exists when insertoverwrite fails") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + withTable("t", "t1") { + sql("CREATE TABLE t(c1 int) USING parquet") + sql("CREATE TABLE t1(c2 long) USING parquet") + sql("INSERT OVERWRITE TABLE t1 SELECT 6000044164") + + val identifier = TableIdentifier("t") + val location = spark.sessionState.catalog.getTableMetadata(identifier).location + + intercept[SparkException] { + sql("INSERT OVERWRITE TABLE t SELECT c2 FROM " + + "(SELECT cast(c2 as int) as c2 FROM t1 distribute by c2)") + } + // scalastyle:off hadoopconfiguration + val fs = FileSystem.get(location, spark.sparkContext.hadoopConfiguration) + // scalastyle:on hadoopconfiguration + assert(fs.exists(new Path(location))) + } + } + } }