Skip to content

Commit

Permalink
[SPARK-18917][SQL] Remove schema check in appending data
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jan 17, 2017
1 parent 18ee55d commit 25272e9
Showing 1 changed file with 3 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,12 @@ case class DataSource(
* dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use
* this information, therefore calls to this method should be very cheap, i.e. there won't
* be any further inference in any triggers.
* 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the
* existing table's partitioning scheme. This is achieved by not providing
* `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early
* exit, if we don't care about the schema of the original table.
*
* @param format the file format object for this DataSource
* @param justPartitioning Whether to exit early and provide just the schema partitioning.
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns. If `justPartitioning` is `true`, then the dataSchema will be provided as
* `null`.
* columns.
*/
private def getOrInferFileFormatSchema(
format: FileFormat,
justPartitioning: Boolean = false): (StructType, StructType) = {
private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
Expand Down Expand Up @@ -174,9 +166,7 @@ case class DataSource(
StructType(partitionFields)
}
}
if (justPartitioning) {
return (null, partitionSchema)
}

val dataSchema = userSpecifiedSchema.map { schema =>
val equality = sparkSession.sessionState.conf.resolver
StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
Expand Down Expand Up @@ -434,26 +424,6 @@ case class DataSource(
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)

// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumns = Try {
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
}
}

// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
Expand Down

0 comments on commit 25272e9

Please sign in to comment.