-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27418][SQL] Migrate Parquet to File Data Source V2 #24327
Conversation
Test build #104442 has finished for PR 24327 at commit
|
Test build #104503 has finished for PR 24327 at commit
|
336ac92
to
99b8575
Compare
Test build #104550 has finished for PR 24327 at commit
|
99b8575
to
cf6837c
Compare
Test build #104662 has finished for PR 24327 at commit
|
cf6837c
to
ab70c37
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
Outdated
Show resolved
Hide resolved
Test build #104816 has finished for PR 24327 at commit
|
Test build #104823 has finished for PR 24327 at commit
|
b3b04b0
to
138344e
Compare
Test build #104841 has finished for PR 24327 at commit
|
This is ready for review. @cloud-fan @HyukjinKwon @dongjoon-hyun |
...c/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
Show resolved
Hide resolved
Test build #104948 has finished for PR 24327 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
Show resolved
Hide resolved
30d88cb
to
c6e602f
Compare
Test build #105005 has finished for PR 24327 at commit
|
...e/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
Outdated
Show resolved
Hide resolved
Test build #105040 has finished for PR 24327 at commit
|
Test build #105042 has finished for PR 24327 at commit
|
The test case "org.apache.spark.sql.streaming.FileStreamSinkSuite.writing with aggregation" becomes flaky with this PR. |
retest this please. |
} | ||
|
||
private def createRowBaseReader(file: PartitionedFile): ParquetRecordReader[UnsafeRow] = { | ||
buildReaderBase(file, createRowBaseReader0).asInstanceOf[ParquetRecordReader[UnsafeRow]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildReaderBase
is parameterized, but the result is still casted. Why not parameterize so that ti retursn ParquetRecordReader[UnsafeRow]
to avoid the cast? I think these casts should be removed.
|
||
// The actual filter push down happens in [[ParquetPartitionReaderFactory]]. | ||
// It requires the Parquet physical schema to determine whether a filter is convertible. | ||
// So here we simply mark that all the filters are pushed down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment isn't correct. All filters that can be converted to Parquet are pushed down.
paths: Seq[String], | ||
userSpecifiedSchema: Option[StructType], | ||
fallbackFileFormat: Class[_ <: FileFormat]) | ||
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this will also hit SPARK-27960. I think this is okay for now. No need to block Parquet to fix it.
However, it would be good to follow up with a suite of SQL tests for each v2 implementation that validates overall behavior, like reporting the metastore schema after a table is created.
|
||
val committerClass = | ||
conf.getClass( | ||
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does v2 also use Parquet _metadata
files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is disabled by default
// Sets compression scheme | ||
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) | ||
|
||
// SPARK-15719: Disables writing Parquet summary files by default. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they are disabled by default in v1, why allow writing them in v2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the behavior in V1 and V2 are the same: by default set "parquet.summary.metadata.level" as "NONE" and don't write the summary file. If the conf "parquet.summary.metadata.level" is set by user and spark.sql.parquet.output.committer.class
is set correctly, then it will write the summary file.
See: https://issues.apache.org/jira/browse/SPARK-15719
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should v2 support deprecated metadata files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is consistent with V1 here.
The value of parquet.summary.metadata.level
is ALL
by default. As per SPARK-15719, we should set it as NONE
by default in Spark.
If users set the conf parquet.summary.metadata.level
as ALL
or COMMON_ONLY
explicitly, Spark should write metadata files.
Test build #106277 has finished for PR 24327 at commit
|
@rdblue Thanks for the review. I have addressed all your comments. Any other concerns? |
retest this please. |
Test build #106380 has finished for PR 24327 at commit
|
retest this please. |
Test build #106391 has finished for PR 24327 at commit
|
@dongjoon-hyun Would you help do a final review and merge this one? Thanks! |
|
||
private def createVectorizedReader(file: PartitionedFile): VectorizedParquetRecordReader = { | ||
val vectorizedReader = | ||
buildReaderBase(file, createVectorizedReader0).asInstanceOf[VectorizedParquetRecordReader] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang, why is this cast here? I expected it to be removed when the one in createRowBaseReader
was removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because here we need to call the method initBatch
and enableReturningBatches
of VectorizedParquetRecordReader. We can't just change the returned type as RecordReader[Void, Object]
here.
vectorizedReader | ||
} | ||
|
||
private def createVectorizedReader0( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no biggie but I'd name it to createParquetVectorizedReader
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) | ||
} | ||
|
||
lazy val _pushedFilters = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a big deal here too but I'd name it to pushedParquetFilters
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema) | ||
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, | ||
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) | ||
parquetFilters.convertibleFilters(this.filters).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if I missed some context. What's diff between ParquetFilters.convertibleFilters
and ParquetFilters.createFilters
? Seems like logic is duplicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParquetFilters.convertibleFilters
returns Seq[org.apache.spark.sql.sources.Filter]
ParquetFilters.createFilters
returns org.apache.parquet.filter2.predicate.FilterPredicate
The overlap of the two methods is only on the And
/Or
/Not
operator.
Test build #106522 has finished for PR 24327 at commit
|
Merged to master. For #24327 (comment), I think it's confusing. Let me take a look and see if I can make it simpler separately. |
@dongjoon-hyun, @rdblue, @cloud-fan, let me know if there are any major comments to address that I missed. If that's not easily fixable, I don't mind reverting it as well. |
@dongjoon-hyun @rdblue @cloud-fan @mallman @HyukjinKwon @gatorsmile @jaceklaskowski Thanks for the review! |
assertDF(df) | ||
// TODO: fix file source V2 as well. | ||
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { | ||
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is this related to parquet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[info] Decoded objects do not match expected objects:
[info] expected: WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
[info] actual: WrappedArray(9, 0, 10, 1, 2, 8, 3, 6, 7, 5, 4)
[info] assertnotnull(upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long"))
[info] +- upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long")
[info] +- getcolumnbyordinal(0, LongType) (QueryTest.scala:70)
We need to fix the read path for steaming output.
What changes were proposed in this pull request?
Migrate Parquet to File Data Source V2
How was this patch tested?
Unit test