From e708dd07cc1df9365fbd75beb6d0321bc135c555 Mon Sep 17 00:00:00 2001 From: Ian Li Date: Wed, 16 Dec 2015 18:56:37 -0800 Subject: [PATCH] SPY-885: CSD's FileSelector is not applied for Parquet in SparkSQL query --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 34 +++++++- .../sql/hive/HiveMetastoreCatalogSuite.scala | 82 +++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e9af4e12d62ae..f71dfcc7b7eec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -490,7 +490,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive parquetRelation } else { - val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) + val paths = selectParquetLocationDirectories( + metastoreRelation.tableName, + Option(metastoreRelation.hiveQlTable.getDataLocation.toString)) val cached = getCached(tableIdentifier, paths, metastoreSchema, None) val parquetRelation = cached.getOrElse { @@ -506,6 +508,36 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } + /** + * Customizing the data directory selection by using hadoopFileSelector. + * + * The value of locationOpt will be returned as single element sequence if + * 1. the hadoopFileSelector is not defined or + * 2. locationOpt is not defined or + * 3. the selected directories are empty. + * + * Otherwise, the non-empty selected directories will be returned. + */ + private[hive] def selectParquetLocationDirectories( + tableName: String, + locationOpt: Option[String]): Seq[String] = { + + val inputPaths: Option[Seq[String]] = for { + selector <- hive.hadoopFileSelector + l <- locationOpt + location = new Path(l) + fs = location.getFileSystem(hive.hiveconf) + selectedPaths <- selector.selectFiles(tableName, fs, location) + selectedDir = for { + selectedPath <- selectedPaths + if selectedPath.getFileSystem(hive.hiveconf).isDirectory(selectedPath) + } yield selectedPath.toString + if selectedDir.nonEmpty + } yield selectedDir + inputPaths.getOrElse(Seq(locationOpt.orNull)) + } + + override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { val db = databaseName.getOrElse(client.currentDatabase) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index d63f3d3996523..f4f57228c63c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.SparkFunSuite +import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} @@ -47,6 +48,87 @@ class HiveMetastoreCatalogSuite extends SparkFunSuite with TestHiveSingleton { logInfo(df.queryExecution.toString) df.as('a).join(df.as('b), $"a.key" === $"b.key") } + +} + +class ParquetLocationSelectionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + import hiveContext._ + + val hmc = new HiveMetastoreCatalog(null, hiveContext) + // ensuring temp directories + val baseDir = { + val base = File.createTempFile("selectParquetLocationDirectories", "1", hiveFilesTemp) + base.delete() + base.mkdirs() + base + } + + test(s"With Selector selecting from ${baseDir.toString}") { + val fullpath = { (somewhere: String, sometable: String) => + s"${baseDir.toString}/$somewhere/$sometable" + } + + hiveContext.setHadoopFileSelector(new HadoopFileSelector() { + override def selectFiles( + sometable: String, + fs: FileSystem, + somewhere: Path): Option[Seq[Path]] = { + Some(Seq(new Path(fullpath(somewhere.toString, sometable)))) + } + }) + + // ensure directory existence for somewhere/sometable + val somewhereSometable = new File(fullpath("somewhere", "sometable")) + somewhereSometable.mkdirs() + // somewhere/sometable is a directory => will be selected + assertResult(Seq(fullpath("somewhere", "sometable"))) { + hmc.selectParquetLocationDirectories("sometable", Option("somewhere")) + } + + // ensure file existence for somewhere/sometable + somewhereSometable.delete() + somewhereSometable.createNewFile() + // somewhere/sometable is a file => will not be selected + assertResult(Seq("somewhere")) { + hmc.selectParquetLocationDirectories("otherplace", Option("somewhere")) + } + + // no location specified, none selected + assertResult(Seq(null)){ + hmc.selectParquetLocationDirectories("sometable", Option(null)) + } + } + + test("With Selector selecting None") { + hiveContext.setHadoopFileSelector(new HadoopFileSelector() { + override def selectFiles( + tableName: String, + fs: FileSystem, + basePath: Path): Option[Seq[Path]] = None + }) + + // none selected + assertResult(Seq("somewhere")) { + hmc.selectParquetLocationDirectories("sometable", Option("somewhere")) + } + // none selected + assertResult(Seq(null)) { + hmc.selectParquetLocationDirectories("sometable", Option(null)) + } + } + + test("Without Selector") { + hiveContext.unsetHadoopFileSelector() + + // none selected + assertResult(Seq("somewhere")) { + hmc.selectParquetLocationDirectories("sometable", Option("somewhere")) + } + // none selected + assertResult(Seq(null)) { + hmc.selectParquetLocationDirectories("sometable", Option(null)) + } + } } class DataSourceWithHiveMetastoreCatalogSuite