Skip to content

Commit

Permalink
[SPARK-17980][SQL] Fix refreshByPath for converted Hive tables
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

There was a bug introduced in apache#14690 which broke refreshByPath with converted hive tables (though, it turns out it was very difficult to refresh converted hive tables anyways, since you had to specify the exact path of one of the partitions).

This changes refreshByPath to invalidate by prefix instead of exact match, and fixes the issue.

cc sameeragarwal for refreshByPath changes
mallman

## How was this patch tested?

Extended unit test.

Author: Eric Liang <[email protected]>

Closes apache#15521 from ericl/fix-caching.
  • Loading branch information
ericl authored and Robert Kruszewski committed Oct 31, 2016
1 parent 79a296e commit a73c410
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ abstract class Catalog {

/**
* Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that
* contains the given data source path.
* contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
* everything that is cached.
*
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,10 @@ class CacheManager extends Logging {
plan match {
case lr: LogicalRelation => lr.relation match {
case hr: HadoopFsRelation =>
val prefixToInvalidate = qualifiedPath.toString
val invalidate = hr.location.rootPaths
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
.contains(qualifiedPath)
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
.exists(_.startsWith(prefixToInvalidate))
if (invalidate) hr.location.refresh()
invalidate
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,18 @@ class TableFileCatalog(

private val baseLocation = catalogTable.storage.locationUri

// Populated on-demand by calls to cachedAllPartitions
private var cachedAllPartitions: ListingFileCatalog = null

override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq

override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(filters).listFiles(Nil)
}

override def refresh(): Unit = {}
override def refresh(): Unit = synchronized {
cachedAllPartitions = null
}

/**
* Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
Expand All @@ -64,7 +69,7 @@ class TableFileCatalog(
*/
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
if (filters.isEmpty) {
cachedAllPartitions
allPartitions
} else {
filterPartitions0(filters)
}
Expand All @@ -89,9 +94,14 @@ class TableFileCatalog(
}

// Not used in the hot path of queries when metastore partition pruning is enabled
lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil)
def allPartitions: ListingFileCatalog = synchronized {
if (cachedAllPartitions == null) {
cachedAllPartitions = filterPartitions0(Nil)
}
cachedAllPartitions
}

override def inputFiles: Array[String] = cachedAllPartitions.inputFiles
override def inputFiles: Array[String] = allPartitions.inputFiles
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
if (lazyPruningEnabled) {
catalog
} else {
catalog.cachedAllPartitions
catalog.allPartitions
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,13 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
val df = spark.sql("select * from test")
assert(sql("select * from test").count() == 5)

def deleteRandomFile(): Unit = {
val p = new Path(spark.table("test").inputFiles.head)
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true))
}

// Delete a file, then assert that we tried to read it. This means the table was cached.
val p = new Path(spark.table("test").inputFiles.head)
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, true))
deleteRandomFile()
val e = intercept[SparkException] {
sql("select * from test").count()
}
Expand All @@ -91,6 +95,19 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
// Test refreshing the cache.
spark.catalog.refreshTable("test")
assert(sql("select * from test").count() == 4)
assert(spark.table("test").inputFiles.length == 4)

// Test refresh by path separately since it goes through different code paths than
// refreshTable does.
deleteRandomFile()
spark.catalog.cacheTable("test")
spark.catalog.refreshByPath("/some-invalid-path") // no-op
val e2 = intercept[SparkException] {
sql("select * from test").count()
}
assert(e2.getMessage.contains("FileNotFoundException"))
spark.catalog.refreshByPath(dir.getAbsolutePath)
assert(sql("select * from test").count() == 3)
}
}
}
Expand Down

0 comments on commit a73c410

Please sign in to comment.