Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPY-885 #138

Merged
merged 1 commit into from
Dec 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive

parquetRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
val paths = selectParquetLocationDirectories(
metastoreRelation.tableName,
Option(metastoreRelation.hiveQlTable.getDataLocation.toString))

val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
Expand All @@ -506,6 +508,36 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
}

/**
* Customizing the data directory selection by using hadoopFileSelector.
*
* The value of locationOpt will be returned as single element sequence if
* 1. the hadoopFileSelector is not defined or
* 2. locationOpt is not defined or
* 3. the selected directories are empty.
*
* Otherwise, the non-empty selected directories will be returned.
*/
private[hive] def selectParquetLocationDirectories(
tableName: String,
locationOpt: Option[String]): Seq[String] = {

val inputPaths: Option[Seq[String]] = for {
selector <- hive.hadoopFileSelector
l <- locationOpt
location = new Path(l)
fs = location.getFileSystem(hive.hiveconf)
selectedPaths <- selector.selectFiles(tableName, fs, location)
selectedDir = for {
selectedPath <- selectedPaths
if selectedPath.getFileSystem(hive.hiveconf).isDirectory(selectedPath)
} yield selectedPath.toString
if selectedDir.nonEmpty
} yield selectedDir
inputPaths.getOrElse(Seq(locationOpt.orNull))
}


override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
val db = databaseName.getOrElse(client.currentDatabase)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import java.io.File

import org.apache.spark.SparkFunSuite
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
Expand Down Expand Up @@ -47,6 +48,87 @@ class HiveMetastoreCatalogSuite extends SparkFunSuite with TestHiveSingleton {
logInfo(df.queryExecution.toString)
df.as('a).join(df.as('b), $"a.key" === $"b.key")
}

}

class ParquetLocationSelectionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import hiveContext._

val hmc = new HiveMetastoreCatalog(null, hiveContext)
// ensuring temp directories
val baseDir = {
val base = File.createTempFile("selectParquetLocationDirectories", "1", hiveFilesTemp)
base.delete()
base.mkdirs()
base
}

test(s"With Selector selecting from ${baseDir.toString}") {
val fullpath = { (somewhere: String, sometable: String) =>
s"${baseDir.toString}/$somewhere/$sometable"
}

hiveContext.setHadoopFileSelector(new HadoopFileSelector() {
override def selectFiles(
sometable: String,
fs: FileSystem,
somewhere: Path): Option[Seq[Path]] = {
Some(Seq(new Path(fullpath(somewhere.toString, sometable))))
}
})

// ensure directory existence for somewhere/sometable
val somewhereSometable = new File(fullpath("somewhere", "sometable"))
somewhereSometable.mkdirs()
// somewhere/sometable is a directory => will be selected
assertResult(Seq(fullpath("somewhere", "sometable"))) {
hmc.selectParquetLocationDirectories("sometable", Option("somewhere"))
}

// ensure file existence for somewhere/sometable
somewhereSometable.delete()
somewhereSometable.createNewFile()
// somewhere/sometable is a file => will not be selected
assertResult(Seq("somewhere")) {
hmc.selectParquetLocationDirectories("otherplace", Option("somewhere"))
}

// no location specified, none selected
assertResult(Seq(null)){
hmc.selectParquetLocationDirectories("sometable", Option(null))
}
}

test("With Selector selecting None") {
hiveContext.setHadoopFileSelector(new HadoopFileSelector() {
override def selectFiles(
tableName: String,
fs: FileSystem,
basePath: Path): Option[Seq[Path]] = None
})

// none selected
assertResult(Seq("somewhere")) {
hmc.selectParquetLocationDirectories("sometable", Option("somewhere"))
}
// none selected
assertResult(Seq(null)) {
hmc.selectParquetLocationDirectories("sometable", Option(null))
}
}

test("Without Selector") {
hiveContext.unsetHadoopFileSelector()

// none selected
assertResult(Seq("somewhere")) {
hmc.selectParquetLocationDirectories("sometable", Option("somewhere"))
}
// none selected
assertResult(Seq(null)) {
hmc.selectParquetLocationDirectories("sometable", Option(null))
}
}
}

class DataSourceWithHiveMetastoreCatalogSuite
Expand Down