Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6777] [SQL] Implements backwards compatibility rules in CatalystSchemaConverter #6617

Closed
wants to merge 11 commits into from

Conversation

liancheng
Copy link
Contributor

This PR introduces CatalystSchemaConverter for converting Parquet schema to Spark SQL schema and vice versa. Original conversion code in ParquetTypesConverter is removed. Benefits of the new version are:

  1. When converting Spark SQL schemas, it generates standard Parquet schemas conforming to the most updated Parquet format spec. Converting to old style Parquet schemas is also supported via feature flag spark.sql.parquet.followParquetFormatSpec (which is set to false for now, and should be set to true after both read and write paths are fixed).

    Note that although this version of Parquet format spec hasn't been officially release yet, Parquet MR 1.7.0 already sticks to it. So it should be safe to follow.

  2. It implements backwards-compatibility rules described in the most updated Parquet format spec. Thus can recognize more schema patterns generated by other/legacy systems/tools.

  3. Code organization follows convention used in parquet-mr, which is easier to follow. (Structure of CatalystSchemaConverter is similar to AvroSchemaConverter).

To fully implement backwards-compatibility rules in both read and write path, we also need to update CatalystRowConverter (which is responsible for converting Parquet records to Rows), RowReadSupport, and RowWriteSupport. These would be done in follow-up PRs.

TODO

  • More schema conversion test cases for legacy schema patterns.

@liancheng liancheng changed the title [SPARK-6777] [SQL] Implements backwards compatibility rules in CatalystSchemaConverter [SPARK-6777] [SQL] [WIP] Implements backwards compatibility rules in CatalystSchemaConverter Jun 3, 2015
@SparkQA
Copy link

SparkQA commented Jun 3, 2015

Test build #34103 has finished for PR 6617 at commit ef02156.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

The build failure was because I forgot to handle UDT in CatalystSchemaConverter.

@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #34279 has finished for PR 6617 at commit c31e2cd.

  • This patch fails MiMa tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@liancheng liancheng changed the title [SPARK-6777] [SQL] [WIP] Implements backwards compatibility rules in CatalystSchemaConverter [SPARK-6777] [SQL] Implements backwards compatibility rules in CatalystSchemaConverter Jun 5, 2015
@liancheng
Copy link
Contributor Author

Hey @rdblue, would you mind to help reviewing CatalystSchemaConverter? This class resembles AvroSchemaConverter a lot. Really would like to have a Parquet guru to review this part :) Thanks in advance!

@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #34280 has finished for PR 6617 at commit 909a7ea.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

@yhuai @rxin Still need to double check compatibility with parquet-thrift, but this is already OK for review.

@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #34282 timed out for PR 6617 at commit b95b37c after a configured wait of 175m.

@SparkQA
Copy link

SparkQA commented Jun 5, 2015

Test build #884 has finished for PR 6617 at commit b95b37c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 8, 2015

Test build #34460 has finished for PR 6617 at commit 0bf8b64.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2015

Test build #34625 has finished for PR 6617 at commit 5a673d5.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • case class PrecisionInfo(precision: Int, scale: Int)

@SparkQA
Copy link

SparkQA commented Jun 10, 2015

Test build #34627 has finished for PR 6617 at commit b716d3f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class OverrideFunctionRegistry(underlying: FunctionRegistry) extends FunctionRegistry
    • abstract class LeafMathExpression(c: Double, name: String)
    • case class EulerNumber() extends LeafMathExpression(math.E, "E")
    • case class Pi() extends LeafMathExpression(math.Pi, "PI")
    • case class PrecisionInfo(precision: Int, scale: Int)

@SparkQA
Copy link

SparkQA commented Jun 11, 2015

Test build #34691 has finished for PR 6617 at commit abd4758.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PrecisionInfo(precision: Int, scale: Int)

@liancheng
Copy link
Contributor Author

The last build failure was actually a bug in the test code (decimal scale shouldn't be larger than precision), which was caught by the new schema converter.

@SparkQA
Copy link

SparkQA commented Jun 12, 2015

Test build #34770 has finished for PR 6617 at commit 5184cbc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PrecisionInfo(precision: Int, scale: Int)

@SparkQA
Copy link

SparkQA commented Jun 24, 2015

Test build #35666 has finished for PR 6617 at commit 277ddf4.

  • This patch fails MiMa tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • case class PrecisionInfo(precision: Int, scale: Int)

@liancheng
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 24, 2015

Test build #35668 has finished for PR 6617 at commit b60979b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PrecisionInfo(precision: Int, scale: Int)

@SparkQA
Copy link

SparkQA commented Jun 24, 2015

Test build #35715 has finished for PR 6617 at commit 2a2062d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PrecisionInfo(precision: Int, scale: Int)

@liancheng
Copy link
Contributor Author

I'm merging this to master since it blocks some other work (e.g. #6796).

@rdblue Please feel free to comment on this PR even though it's merged. I can address your comments in follow-up PRs.

@asfgit asfgit closed this in 8ab5076 Jun 24, 2015
@liancheng liancheng deleted the spark-6777 branch June 24, 2015 22:07
asfgit pushed a commit that referenced this pull request Jul 8, 2015
…ath for interoperability and backwards-compatibility

This PR is a follow-up of #6617 and is part of [SPARK-6774] [2], which aims to ensure interoperability and backwards-compatibility for Spark SQL Parquet support.  And this one fixes the read path.  Now Spark SQL is expected to be able to read legacy Parquet data files generated by most (if not all) common libraries/tools like parquet-thrift, parquet-avro, and parquet-hive. However, we still need to refactor the write path to write standard Parquet LISTs and MAPs ([SPARK-8848] [4]).

### Major changes

1. `CatalystConverter` class hierarchy refactoring

   - Replaces `CatalystConverter` trait with a much simpler `ParentContainerUpdater`.

     Now instead of extending the original `CatalystConverter` trait, every converter class accepts an updater which is responsible for propagating the converted value to some parent container. For example, appending array elements to a parent array buffer, appending a key-value pairs to a parent mutable map, or setting a converted value to some specific field of a parent row. Root converter doesn't have a parent and thus uses a `NoopUpdater`.

     This simplifies the design since converters don't need to care about details of their parent converters anymore.

   - Unifies `CatalystRootConverter`, `CatalystGroupConverter` and `CatalystPrimitiveRowConverter` into `CatalystRowConverter`

     Specifically, now all row objects are represented by `SpecificMutableRow` during conversion.

   - Refactors `CatalystArrayConverter`, and removes `CatalystArrayContainsNullConverter` and `CatalystNativeArrayConverter`

     `CatalystNativeArrayConverter` was probably designed with the intention of avoiding boxing costs. However, the way it uses Scala generics actually doesn't achieve this goal.

     The new `CatalystArrayConverter` handles both nullable and non-nullable array elements in a consistent way.

   - Implements backwards-compatibility rules in `CatalystArrayConverter`

     When Parquet records are being converted, schema of Parquet files should have already been verified. So we only need to care about the structure rather than field names in the Parquet schema. Since all map objects represented in legacy systems have the same structure as the standard one (see [backwards-compatibility rules for MAP] [1]), we only need to deal with LIST (namely array) in `CatalystArrayConverter`.

2. Requested columns handling

   When specifying requested columns in `RowReadSupport`, we used to use a Parquet `MessageType` converted from a Catalyst `StructType` which contains all requested columns.  This is not preferable when taking compatibility and interoperability into consideration.  Because the actual Parquet file may have different physical structure from the converted schema.

   In this PR, the schema for requested columns is constructed using the following method:

   - For a column that exists in the target Parquet file, we extract the column type by name from the full file schema, and construct a single-field `MessageType` for that column.
   - For a column that doesn't exist in the target Parquet file, we create a single-field `StructType` and convert it to a `MessageType` using `CatalystSchemaConverter`.
   - Unions all single-field `MessageType`s into a full schema containing all requested fields

   With this change, we also fix [SPARK-6123] [3] by validating the global schema against each individual Parquet part-files.

### Testing

This PR also adds compatibility tests for parquet-avro, parquet-thrift, and parquet-hive. Please refer to `README.md` under `sql/core/src/test` for more information about these tests. To avoid build time code generation and adding extra complexity to the build system, Java code generated from testing Thrift schema and Avro IDL is also checked in.

[1]: https://github.com/apache/incubator-parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
[2]: https://issues.apache.org/jira/browse/SPARK-6774
[3]: https://issues.apache.org/jira/browse/SPARK-6123
[4]: https://issues.apache.org/jira/browse/SPARK-8848

Author: Cheng Lian <[email protected]>

Closes #7231 from liancheng/spark-6776 and squashes the following commits:

360fe18 [Cheng Lian] Adds ParquetHiveCompatibilitySuite
c6fbc06 [Cheng Lian] Removes WIP file committed by mistake
b8c1295 [Cheng Lian] Excludes the whole parquet package from MiMa
598c3e8 [Cheng Lian] Adds extra Maven repo for hadoop-lzo, which is a transitive dependency of parquet-thrift
926af87 [Cheng Lian] Simplifies Parquet compatibility test suites
7946ee1 [Cheng Lian] Fixes Scala styling issues
3d7ab36 [Cheng Lian] Fixes .rat-excludes
a8f13bb [Cheng Lian] Using Parquet writer API to do compatibility tests
f2208cd [Cheng Lian] Adds README.md for Thrift/Avro code generation
1d390aa [Cheng Lian] Adds parquet-thrift compatibility test
440f7b3 [Cheng Lian] Adds generated files to .rat-excludes
13b9121 [Cheng Lian] Adds ParquetAvroCompatibilitySuite
06cfe9d [Cheng Lian] Adds comments about TimestampType handling
a099d3e [Cheng Lian] More comments
0cc1b37 [Cheng Lian] Fixes MiMa checks
884d3e6 [Cheng Lian] Fixes styling issue and reverts unnecessary changes
802cbd7 [Cheng Lian] Fixes bugs related to schema merging and empty requested columns
38fe1e7 [Cheng Lian] Adds explicit return type
7fb21f1 [Cheng Lian] Reverts an unnecessary debugging change
1781dff [Cheng Lian] Adds test case for SPARK-8811
6437d4b [Cheng Lian] Assembles requested schema from Parquet file schema
bcac49f [Cheng Lian] Removes the 16-byte restriction of decimals
a74fb2c [Cheng Lian] More comments
0525346 [Cheng Lian] Removes old Parquet record converters
03c3bd9 [Cheng Lian] Refactors Parquet read path to implement backwards-compatibility rules

CatalystSchemaConverter.analysisRequire(
maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't hurt to double-check this, but this will be verified by the builder when the Parquet Type is constructed from file metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benefit of this check is that it throws an AnalysisException, which can be specially handled to provide better error message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants