diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 29afe5751bc16..ecfcafe69c1f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -104,20 +104,12 @@ case class DataSource( * dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use * this information, therefore calls to this method should be very cheap, i.e. there won't * be any further inference in any triggers. - * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the - * existing table's partitioning scheme. This is achieved by not providing - * `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early - * exit, if we don't care about the schema of the original table. * * @param format the file format object for this DataSource - * @param justPartitioning Whether to exit early and provide just the schema partitioning. * @return A pair of the data schema (excluding partition columns) and the schema of the partition - * columns. If `justPartitioning` is `true`, then the dataSchema will be provided as - * `null`. + * columns. */ - private def getOrInferFileFormatSchema( - format: FileFormat, - justPartitioning: Boolean = false): (StructType, StructType) = { + private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = { // the operations below are expensive therefore try not to do them if we don't need to, e.g., // in streaming mode, we have already inferred and registered partition columns, we will // never have to materialize the lazy val below @@ -174,9 +166,7 @@ case class DataSource( StructType(partitionFields) } } - if (justPartitioning) { - return (null, partitionSchema) - } + val dataSchema = userSpecifiedSchema.map { schema => val equality = sparkSession.sessionState.conf.resolver StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name)))) @@ -434,26 +424,6 @@ case class DataSource( val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive) - // If we are appending to a table that already exists, make sure the partitioning matches - // up. If we fail to load the table for whatever reason, ignore the check. - if (mode == SaveMode.Append) { - val existingPartitionColumns = Try { - getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList - }.getOrElse(Seq.empty[String]) - // TODO: Case sensitivity. - val sameColumns = - existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase()) - if (existingPartitionColumns.nonEmpty && !sameColumns) { - throw new AnalysisException( - s"""Requested partitioning does not match existing partitioning. - |Existing partitioning columns: - | ${existingPartitionColumns.mkString(", ")} - |Requested partitioning columns: - | ${partitionColumns.mkString(", ")} - |""".stripMargin) - } - } - // SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does // not need to have the query as child, to avoid to analyze an optimized query, // because InsertIntoHadoopFsRelationCommand will be optimized first.