Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jun 14, 2019
1 parent 7d3a568 commit f658e92
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ case class ParquetPartitionReaderFactory(
}

private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, UnsafeRow] = {
buildReaderBase(file, createRowBaseReader0)
buildReaderBase(file, createRowBaseParquetReader)
}

private def createRowBaseReader0(
private def createRowBaseParquetReader(
split: ParquetInputSplit,
partitionValues: InternalRow,
hadoopAttemptContext: TaskAttemptContextImpl,
Expand All @@ -203,13 +203,13 @@ case class ParquetPartitionReaderFactory(
}

private def createVectorizedReader(file: PartitionedFile): VectorizedParquetRecordReader = {
val vectorizedReader =
buildReaderBase(file, createVectorizedReader0).asInstanceOf[VectorizedParquetRecordReader]
val vectorizedReader = buildReaderBase(file, createParquetVectorizedReader)
.asInstanceOf[VectorizedParquetRecordReader]
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
vectorizedReader
}

private def createVectorizedReader0(
private def createParquetVectorizedReader(
split: ParquetInputSplit,
partitionValues: InternalRow,
hadoopAttemptContext: TaskAttemptContextImpl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class ParquetScanBuilder(
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
}

lazy val _pushedFilters = {
lazy val pushedParquetFilters = {
val sqlConf = sparkSession.sessionState.conf
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
Expand All @@ -66,10 +66,10 @@ case class ParquetScanBuilder(
// Note: for Parquet, the actual filter push down happens in [[ParquetPartitionReaderFactory]].
// It requires the Parquet physical schema to determine whether a filter is convertible.
// All filters that can be converted to Parquet are pushed down.
override def pushedFilters(): Array[Filter] = _pushedFilters
override def pushedFilters(): Array[Filter] = pushedParquetFilters

override def build(): Scan = {
ParquetScan(sparkSession, hadoopConf, fileIndex, dataSchema, readDataSchema(),
readPartitionSchema(), _pushedFilters, options)
readPartitionSchema(), pushedParquetFilters, options)
}
}

0 comments on commit f658e92

Please sign in to comment.