Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27418][SQL] Migrate Parquet to File Data Source V2 #24327

Closed
wants to merge 4 commits into from

Conversation

gengliangwang
Copy link
Member

What changes were proposed in this pull request?

Migrate Parquet to File Data Source V2

How was this patch tested?

Unit test

@SparkQA
Copy link

SparkQA commented Apr 9, 2019

Test build #104442 has finished for PR 24327 at commit 27e602f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class ParquetLogRedirector implements Serializable
  • class ParquetFilters(
  • class ParquetOutputWriter(path: String, context: TaskAttemptContext)
  • class ParquetReadSupport(val convertTz: Option[TimeZone],
  • case class FileTypes(
  • class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging
  • class ParquetDataSourceV2 extends FileDataSourceV2
  • case class ParquetPartitionReaderFactory(
  • case class ParquetScan(
  • case class ParquetScanBuilder(
  • case class ParquetTable(
  • class ParquetWriteBuilder(

@SparkQA
Copy link

SparkQA commented Apr 11, 2019

Test build #104503 has finished for PR 24327 at commit 336ac92.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2019

Test build #104550 has finished for PR 24327 at commit 99b8575.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 17, 2019

Test build #104662 has finished for PR 24327 at commit cf6837c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 23, 2019

Test build #104816 has finished for PR 24327 at commit ab70c37.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 23, 2019

Test build #104823 has finished for PR 24327 at commit b3b04b0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 23, 2019

Test build #104841 has finished for PR 24327 at commit 138344e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang gengliangwang changed the title [WIP][SPARK-27418][SQL] Migrate Parquet to File Data Source V2 [SPARK-27418][SQL] Migrate Parquet to File Data Source V2 Apr 26, 2019
@gengliangwang
Copy link
Member Author

This is ready for review. @cloud-fan @HyukjinKwon @dongjoon-hyun
I will update test cases related to sameResult after #24475 is merged.

@SparkQA
Copy link

SparkQA commented Apr 27, 2019

Test build #104948 has finished for PR 24327 at commit 30d88cb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 29, 2019

Test build #105005 has finished for PR 24327 at commit c6e602f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 30, 2019

Test build #105040 has finished for PR 24327 at commit 18fda4f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 30, 2019

Test build #105042 has finished for PR 24327 at commit 104aaa1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

The test case "org.apache.spark.sql.streaming.FileStreamSinkSuite.writing with aggregation" becomes flaky with this PR.
@cloud-fan @jose-torres Any idea about that?

@gengliangwang
Copy link
Member Author

retest this please.

}

private def createRowBaseReader(file: PartitionedFile): ParquetRecordReader[UnsafeRow] = {
buildReaderBase(file, createRowBaseReader0).asInstanceOf[ParquetRecordReader[UnsafeRow]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildReaderBase is parameterized, but the result is still casted. Why not parameterize so that ti retursn ParquetRecordReader[UnsafeRow] to avoid the cast? I think these casts should be removed.


// The actual filter push down happens in [[ParquetPartitionReaderFactory]].
// It requires the Parquet physical schema to determine whether a filter is convertible.
// So here we simply mark that all the filters are pushed down.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment isn't correct. All filters that can be converted to Parquet are pushed down.

paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this will also hit SPARK-27960. I think this is okay for now. No need to block Parquet to fix it.

However, it would be good to follow up with a suite of SQL tests for each v2 implementation that validates overall behavior, like reporting the metastore schema after a table is created.


val committerClass =
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does v2 also use Parquet _metadata files?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is disabled by default

// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)

// SPARK-15719: Disables writing Parquet summary files by default.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they are disabled by default in v1, why allow writing them in v2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the behavior in V1 and V2 are the same: by default set "parquet.summary.metadata.level" as "NONE" and don't write the summary file. If the conf "parquet.summary.metadata.level" is set by user and spark.sql.parquet.output.committer.class is set correctly, then it will write the summary file.
See: https://issues.apache.org/jira/browse/SPARK-15719

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should v2 support deprecated metadata files?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is consistent with V1 here.
The value of parquet.summary.metadata.level is ALL by default. As per SPARK-15719, we should set it as NONE by default in Spark.
If users set the conf parquet.summary.metadata.level as ALL or COMMON_ONLY explicitly, Spark should write metadata files.

@SparkQA
Copy link

SparkQA commented Jun 7, 2019

Test build #106277 has finished for PR 24327 at commit 7d3a568.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

@rdblue Thanks for the review. I have addressed all your comments. Any other concerns?

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jun 11, 2019

Test build #106380 has finished for PR 24327 at commit 7d3a568.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Jun 11, 2019

Test build #106391 has finished for PR 24327 at commit 7d3a568.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

@dongjoon-hyun Would you help do a final review and merge this one? Thanks!


private def createVectorizedReader(file: PartitionedFile): VectorizedParquetRecordReader = {
val vectorizedReader =
buildReaderBase(file, createVectorizedReader0).asInstanceOf[VectorizedParquetRecordReader]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang, why is this cast here? I expected it to be removed when the one in createRowBaseReader was removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because here we need to call the method initBatch and enableReturningBatches of VectorizedParquetRecordReader. We can't just change the returned type as RecordReader[Void, Object] here.

vectorizedReader
}

private def createVectorizedReader0(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no biggie but I'd name it to createParquetVectorizedReader

sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
}

lazy val _pushedFilters = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal here too but I'd name it to pushedParquetFilters

new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema)
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
parquetFilters.convertibleFilters(this.filters).toArray
Copy link
Member

@HyukjinKwon HyukjinKwon Jun 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I missed some context. What's diff between ParquetFilters.convertibleFilters and ParquetFilters.createFilters? Seems like logic is duplicated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParquetFilters.convertibleFilters returns Seq[org.apache.spark.sql.sources.Filter]
ParquetFilters.createFilters returns org.apache.parquet.filter2.predicate.FilterPredicate

The overlap of the two methods is only on the And/Or/Not operator.

@SparkQA
Copy link

SparkQA commented Jun 14, 2019

Test build #106522 has finished for PR 24327 at commit f658e92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

For #24327 (comment), I think it's confusing. Let me take a look and see if I can make it simpler separately.

@HyukjinKwon
Copy link
Member

@dongjoon-hyun, @rdblue, @cloud-fan, let me know if there are any major comments to address that I missed. If that's not easily fixable, I don't mind reverting it as well.

@gengliangwang
Copy link
Member Author

assertDF(df)
// TODO: fix file source V2 as well.
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this related to parquet?

Copy link
Member Author

@gengliangwang gengliangwang Jun 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[info]   Decoded objects do not match expected objects:
[info]   expected: WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
[info]   actual:   WrappedArray(9, 0, 10, 1, 2, 8, 3, 6, 7, 5, 4)
[info]   assertnotnull(upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long"))
[info]   +- upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long")
[info]      +- getcolumnbyordinal(0, LongType) (QueryTest.scala:70)

We need to fix the read path for steaming output.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants