-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6123] [SPARK-6775] [SPARK-6776] [SQL] Refactors Parquet read path for interoperability and backwards-compatibility #7231
Conversation
Merged build triggered. |
Merged build started. |
val julianDay = buf.getInt | ||
updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although INT96
and nanosecond timestamp is being deprecated in Parquet, this PR doesn't implement TIMESTAMP_MICROS
, because parquet-mr 1.7.0 (the version we are currently using) doesn't have TIMESTAMP_MICROS
yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Should probably add a TODO comment there.)
Done
Test build #36553 has started for PR 7231 at commit |
Test build #36553 has finished for PR 7231 at commit
|
Merged build finished. Test FAILed. |
Merged build triggered. |
Merged build started. |
Test build #36554 has started for PR 7231 at commit |
Test build #36554 has finished for PR 7231 at commit
|
Merged build finished. Test FAILed. |
Merged build triggered. |
Merged build started. |
Test build #36557 has started for PR 7231 at commit |
Test build #36557 has finished for PR 7231 at commit
|
Merged build finished. Test PASSed. |
@rdblue As usual, you're always welcome to help reviewing this :-) As you suggested, the main part of this PR ( @scwf It would be nice if you can help testing this PR against your legacy Hive data. It should fix the Parquet compatibility issue you mentioned to me. Thanks! @Sephiroth-Lin Also would like to know whether this fixes your issue behind SPARK-8811. Thanks! |
@liancheng, thanks! I'll make some time to look at it this week. Most likely Wednesday. |
Merged build triggered. |
Merged build started. |
Test build #36593 has started for PR 7231 at commit |
Test build #36593 has finished for PR 7231 at commit
|
Merged build finished. Test PASSed. |
Build triggered. |
Build started. |
Test build #36672 has started for PR 7231 at commit |
Merged build triggered. |
Merged build started. |
Test build #36767 has started for PR 7231 at commit |
Test build #36767 has finished for PR 7231 at commit
|
Merged build finished. Test FAILed. |
Merged build triggered. |
Merged build started. |
Test build #36769 has started for PR 7231 at commit |
Test build #36769 has finished for PR 7231 at commit
|
Merged build finished. Test PASSed. |
Merged build triggered. |
Merged build started. |
Test build #36819 has started for PR 7231 at commit |
Will probably merge this soon after Jenkins says OK as it blocks other work. More compatibility tests need to be done, but I'd like to leave them to follow-up PRs (ordered by importance):
|
Test build #36819 has finished for PR 7231 at commit
|
Merged build finished. Test PASSed. |
…ndencies These two dependencies were introduced in #7231 to help testing Parquet compatibility with `parquet-thrift`. However, they somehow crash the Scala compiler in Maven builds. This PR fixes this issue by: 1. Removing these two dependencies, and 2. Instead of generating the testing Parquet file programmatically, checking in an actual testing Parquet file generated by `parquet-thrift` as a test resource. This is just a quick fix to bring back Maven builds. Need to figure out the root case as binary Parquet files are harder to maintain. Author: Cheng Lian <[email protected]> Closes #7330 from liancheng/spark-8959 and squashes the following commits: cf69512 [Cheng Lian] Brings back Maven builds
This PR can be quite challenging to review. I'm trying to give a detailed description of the problem as well as its solution here. When reading Parquet files, we need to specify a potentially nested Parquet schema (of type `MessageType`) as requested schema for column pruning. This Parquet schema is translated from a Catalyst schema (of type `StructType`), which is generated by the query planner and represents all requested columns. However, this translation can be fairly complicated because of several reasons: 1. Requested schema must conform to the real schema of the physical file to be read. This means we have to tailor the actual file schema of every individual physical Parquet file to be read according to the given Catalyst schema. Fortunately we are already doing this in Spark 1.5 by pushing request schema conversion to executor side in PR #7231. 1. Support for schema merging. A single Parquet dataset may consist of multiple physical Parquet files come with different but compatible schemas. This means we may request for a column path that doesn't exist in a physical Parquet file. All requested column paths can be nested. For example, for a Parquet file schema ``` message root { required group f0 { required group f00 { required int32 f000; required binary f001 (UTF8); } } } ``` we may request for column paths defined in the following schema: ``` message root { required group f0 { required group f00 { required binary f001 (UTF8); required float f002; } } optional double f1; } ``` Notice that we pruned column path `f0.f00.f000`, but added `f0.f00.f002` and `f1`. The good news is that Parquet handles non-existing column paths properly and always returns null for them. 1. The map from `StructType` to `MessageType` is a one-to-many map. This is the most unfortunate part. Due to historical reasons (dark histories!), schemas of Parquet files generated by different libraries have different "flavors". For example, to handle a schema with a single non-nullable column, whose type is an array of non-nullable integers, parquet-protobuf generates the following Parquet schema: ``` message m0 { repeated int32 f; } ``` while parquet-avro generates another version: ``` message m1 { required group f (LIST) { repeated int32 array; } } ``` and parquet-thrift spills this: ``` message m1 { required group f (LIST) { repeated int32 f_tuple; } } ``` All of them can be mapped to the following _unique_ Catalyst schema: ``` StructType( StructField( "f", ArrayType(IntegerType, containsNull = false), nullable = false)) ``` This greatly complicates Parquet requested schema construction, since the path of a given column varies in different cases. To read the array elements from files with the above schemas, we must use `f` for `m0`, `f.array` for `m1`, and `f.f_tuple` for `m2`. In earlier Spark versions, we didn't try to fix this issue properly. Spark 1.4 and prior versions simply translate the Catalyst schema in a way more or less compatible with parquet-hive and parquet-avro, but is broken in many other cases. Earlier revisions of Spark 1.5 only try to tailor the Parquet file schema at the first level, and ignore nested ones. This caused [SPARK-10301] [spark-10301] as well as [SPARK-10005] [spark-10005]. In PR #8228, I tried to avoid the hard part of the problem and made a minimum change in `CatalystRowConverter` to fix SPARK-10005. However, when taking SPARK-10301 into consideration, keeping hacking `CatalystRowConverter` doesn't seem to be a good idea. So this PR is an attempt to fix the problem in a proper way. For a given physical Parquet file with schema `ps` and a compatible Catalyst requested schema `cs`, we use the following algorithm to tailor `ps` to get the result Parquet requested schema `ps'`: For a leaf column path `c` in `cs`: - if `c` exists in `cs` and a corresponding Parquet column path `c'` can be found in `ps`, `c'` should be included in `ps'`; - otherwise, we convert `c` to a Parquet column path `c"` using `CatalystSchemaConverter`, and include `c"` in `ps'`; - no other column paths should exist in `ps'`. Then comes the most tedious part: > Given `cs`, `ps`, and `c`, how to locate `c'` in `ps`? Unfortunately, there's no quick answer, and we have to enumerate all possible structures defined in parquet-format spec. They are: 1. the standard structure of nested types, and 1. cases defined in all backwards-compatibility rules for `LIST` and `MAP`. The core part of this PR is `CatalystReadSupport.clipParquetType()`, which tailors a given Parquet file schema according to a requested schema in its Catalyst form. Backwards-compatibility rules of `LIST` and `MAP` are covered in `clipParquetListType()` and `clipParquetMapType()` respectively. The column path selection algorithm is implemented in `clipParquetGroupFields()`. With this PR, we no longer need to do schema tailoring in `CatalystReadSupport` and `CatalystRowConverter`. Another benefit is that, now we can also read Parquet datasets consist of files with different physical Parquet schema but share the same logical schema, for example, files generated by different Parquet libraries. This situation is illustrated by [this test case] [test-case]. [spark-10301]: https://issues.apache.org/jira/browse/SPARK-10301 [spark-10005]: https://issues.apache.org/jira/browse/SPARK-10005 [test-case]: liancheng@38644d8#diff-a9b98e28ce3ae30641829dffd1173be2R26 Author: Cheng Lian <[email protected]> Closes #8509 from liancheng/spark-10301/fix-parquet-requested-schema.
This PR is a follow-up of #6617 and is part of SPARK-6774, which aims to ensure interoperability and backwards-compatibility for Spark SQL Parquet support. And this one fixes the read path. Now Spark SQL is expected to be able to read legacy Parquet data files generated by most (if not all) common libraries/tools like parquet-thrift, parquet-avro, and parquet-hive. However, we still need to refactor the write path to write standard Parquet LISTs and MAPs (SPARK-8848).
Major changes
CatalystConverter
class hierarchy refactoringReplaces
CatalystConverter
trait with a much simplerParentContainerUpdater
.Now instead of extending the original
CatalystConverter
trait, every converter class accepts an updater which is responsible for propagating the converted value to some parent container. For example, appending array elements to a parent array buffer, appending a key-value pairs to a parent mutable map, or setting a converted value to some specific field of a parent row. Root converter doesn't have a parent and thus uses aNoopUpdater
.This simplifies the design since converters don't need to care about details of their parent converters anymore.
Unifies
CatalystRootConverter
,CatalystGroupConverter
andCatalystPrimitiveRowConverter
intoCatalystRowConverter
Specifically, now all row objects are represented by
SpecificMutableRow
during conversion.Refactors
CatalystArrayConverter
, and removesCatalystArrayContainsNullConverter
andCatalystNativeArrayConverter
CatalystNativeArrayConverter
was probably designed with the intention of avoiding boxing costs. However, the way it uses Scala generics actually doesn't achieve this goal.The new
CatalystArrayConverter
handles both nullable and non-nullable array elements in a consistent way.Implements backwards-compatibility rules in
CatalystArrayConverter
When Parquet records are being converted, schema of Parquet files should have already been verified. So we only need to care about the structure rather than field names in the Parquet schema. Since all map objects represented in legacy systems have the same structure as the standard one (see backwards-compatibility rules for MAP), we only need to deal with LIST (namely array) in
CatalystArrayConverter
.Requested columns handling
When specifying requested columns in
RowReadSupport
, we used to use a ParquetMessageType
converted from a CatalystStructType
which contains all requested columns. This is not preferable when taking compatibility and interoperability into consideration. Because the actual Parquet file may have different physical structure from the converted schema.In this PR, the schema for requested columns is constructed using the following method:
MessageType
for that column.StructType
and convert it to aMessageType
usingCatalystSchemaConverter
.MessageType
s into a full schema containing all requested fieldsWith this change, we also fix SPARK-6123 by validating the global schema against each individual Parquet part-files.
Testing
This PR also adds compatibility tests for parquet-avro, parquet-thrift, and parquet-hive. Please refer to
README.md
undersql/core/src/test
for more information about these tests. To avoid build time code generation and adding extra complexity to the build system, Java code generated from testing Thrift schema and Avro IDL is also checked in.