Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6016][SQL] Cannot read the parquet table after overwriting the existing table when spark.sql.parquet.cacheMetadata=true #4775

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
private[parquet] class FilteringParquetRowInputFormat
extends parquet.hadoop.ParquetInputFormat[Row] with Logging {

private var footers: JList[Footer] = _

private var fileStatuses = Map.empty[Path, FileStatus]

override def createRecordReader(
Expand All @@ -396,46 +394,15 @@ private[parquet] class FilteringParquetRowInputFormat
}
}

override def getFooters(jobContext: JobContext): JList[Footer] = {
import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.footerCache

if (footers eq null) {
val conf = ContextUtil.getConfiguration(jobContext)
val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap
if (statuses.isEmpty) {
footers = Collections.emptyList[Footer]
} else if (!cacheMetadata) {
// Read the footers from HDFS
footers = getFooters(conf, statuses)
} else {
// Read only the footers that are not in the footerCache
val foundFooters = footerCache.getAllPresent(statuses)
val toFetch = new ArrayList[FileStatus]
for (s <- statuses) {
if (!foundFooters.containsKey(s)) {
toFetch.add(s)
}
}
val newFooters = new mutable.HashMap[FileStatus, Footer]
if (toFetch.size > 0) {
val startFetch = System.currentTimeMillis
val fetched = getFooters(conf, toFetch)
logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - startFetch} ms")
for ((status, i) <- toFetch.zipWithIndex) {
newFooters(status) = fetched.get(i)
}
footerCache.putAll(newFooters)
}
footers = new ArrayList[Footer](statuses.size)
for (status <- statuses) {
footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
}
}
}
// This is only a temporary solution sicne we need to use fileStatuses in
// both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these
// two methods.
override def getSplits(jobContext: JobContext): JList[InputSplit] = {
// First set fileStatuses.
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap

footers
super.getSplits(jobContext)
}

// TODO Remove this method and related code once PARQUET-16 is fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private[sql] case class ParquetRelation2(
private var commonMetadataStatuses: Array[FileStatus] = _

// Parquet footer cache.
private var footers: Map[FileStatus, Footer] = _
var footers: Map[FileStatus, Footer] = _

// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
Expand Down Expand Up @@ -400,6 +400,7 @@ private[sql] case class ParquetRelation2(
} else {
metadataCache.dataStatuses.toSeq
}
val selectedFooters = selectedFiles.map(metadataCache.footers)

// FileInputFormat cannot handle empty lists.
if (selectedFiles.nonEmpty) {
Expand Down Expand Up @@ -447,11 +448,16 @@ private[sql] case class ParquetRelation2(
@transient
val cachedStatus = selectedFiles

@transient
val cachedFooters = selectedFooters

// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus

override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
}
} else {
new FilteringParquetRowInputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.SaveMode

// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
Expand Down Expand Up @@ -409,6 +410,32 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
)
""")
}

test("SPARK-6016 make sure to use the latest footers") {
sql("drop table if exists spark_6016_fix")

// Create a DataFrame with two partitions. So, the created table will have two parquet files.
val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
checkAnswer(
sql("select * from spark_6016_fix"),
(1 to 10).map(i => Row(i))
)

// Create a DataFrame with four partitions. So, the created table will have four parquet files.
val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
// For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
// since the new table has four parquet files, we are trying to read new footers from two files
// and then merge metadata in footers of these four (two outdated ones and two latest one),
// which will cause an error.
checkAnswer(
sql("select * from spark_6016_fix"),
(1 to 10).map(i => Row(i))
)

sql("drop table spark_6016_fix")
}
}

class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
Expand Down