Skip to content

Commit

Permalink
[SPARK-6777] [SQL] Implements backwards compatibility rules in Cataly…
Browse files Browse the repository at this point in the history
…stSchemaConverter

This PR introduces `CatalystSchemaConverter` for converting Parquet schema to Spark SQL schema and vice versa.  Original conversion code in `ParquetTypesConverter` is removed. Benefits of the new version are:

1. When converting Spark SQL schemas, it generates standard Parquet schemas conforming to [the most updated Parquet format spec] [1]. Converting to old style Parquet schemas is also supported via feature flag `spark.sql.parquet.followParquetFormatSpec` (which is set to `false` for now, and should be set to `true` after both read and write paths are fixed).

   Note that although this version of Parquet format spec hasn't been officially release yet, Parquet MR 1.7.0 already sticks to it. So it should be safe to follow.

1. It implements backwards-compatibility rules described in the most updated Parquet format spec. Thus can recognize more schema patterns generated by other/legacy systems/tools.
1. Code organization follows convention used in [parquet-mr] [2], which is easier to follow. (Structure of `CatalystSchemaConverter` is similar to `AvroSchemaConverter`).

To fully implement backwards-compatibility rules in both read and write path, we also need to update `CatalystRowConverter` (which is responsible for converting Parquet records to `Row`s), `RowReadSupport`, and `RowWriteSupport`. These would be done in follow-up PRs.

TODO

- [x] More schema conversion test cases for legacy schema patterns.

[1]: https://github.com/apache/parquet-format/blob/ea095226597fdbecd60c2419d96b54b2fdb4ae6c/LogicalTypes.md
[2]: https://github.com/apache/parquet-mr/

Author: Cheng Lian <[email protected]>

Closes #6617 from liancheng/spark-6777 and squashes the following commits:

2a2062d [Cheng Lian] Don't convert decimals without precision information
b60979b [Cheng Lian] Adds a constructor which accepts a Configuration, and fixes default value of assumeBinaryIsString
743730f [Cheng Lian] Decimal scale shouldn't be larger than precision
a104a9e [Cheng Lian] Fixes Scala style issue
1f71d8d [Cheng Lian] Adds feature flag to allow falling back to old style Parquet schema conversion
ba84f4b [Cheng Lian] Fixes MapType schema conversion bug
13cb8d5 [Cheng Lian] Fixes MiMa failure
81de5b0 [Cheng Lian] Fixes UDT, workaround read path, and add tests
28ef95b [Cheng Lian] More AnalysisExceptions
b10c322 [Cheng Lian] Replaces require() with analysisRequire() which throws AnalysisException
cceaf3f [Cheng Lian] Implements backwards compatibility rules in CatalystSchemaConverter
  • Loading branch information
liancheng committed Jun 24, 2015
1 parent fb32c38 commit 8ab5076
Show file tree
Hide file tree
Showing 9 changed files with 1,297 additions and 422 deletions.
7 changes: 6 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter$")
"org.apache.spark.sql.parquet.CatalystTimestampConverter$"),
// SPARK-6777 Implements backwards compatibility rules in CatalystSchemaConverter
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetTypeInfo"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetTypeInfo$")
)
case v if v.startsWith("1.4") =>
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ package org.apache.spark.sql.types
import scala.reflect.runtime.universe.typeTag

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ScalaReflectionLock
import org.apache.spark.sql.catalyst.expressions.Expression


/** Precision parameters for a Decimal */
case class PrecisionInfo(precision: Int, scale: Int)

case class PrecisionInfo(precision: Int, scale: Int) {
if (scale > precision) {
throw new AnalysisException(
s"Decimal scale ($scale) cannot be greater than precision ($precision).")
}
}

/**
* :: DeveloperApi ::
Expand Down
14 changes: 14 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,14 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "<TODO>")

val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
key = "spark.sql.parquet.followParquetFormatSpec",
defaultValue = Some(false),
doc = "Wether to stick to Parquet format specification when converting Parquet schema to " +
"Spark SQL schema and vice versa. Sticks to the specification if set to true; falls back " +
"to compatible mode if set to false.",
isPublic = false)

val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
key = "spark.sql.parquet.output.committer.class",
defaultValue = Some(classOf[ParquetOutputCommitter].getName),
Expand Down Expand Up @@ -498,6 +506,12 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
*/
private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)

/**
* When set to true, sticks to Parquet format spec when converting Parquet schema to Spark SQL
* schema and vice versa. Otherwise, falls back to compatible mode.
*/
private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC)

/**
* When set to true, partition pruning for in-memory columnar tables is enabled.
*/
Expand Down
Loading

0 comments on commit 8ab5076

Please sign in to comment.