Skip to content

Commit

Permalink
Merge pull request #26 from liancheng/serdeAndPartitionPruning
Browse files Browse the repository at this point in the history
Hive SerDe support and partition pruning optimization
  • Loading branch information
marmbrus committed Jan 28, 2014
2 parents 9dd3b26 + c263c84 commit 5dab0bc
Show file tree
Hide file tree
Showing 11 changed files with 379 additions and 87 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked")

resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating-SNAPSHOT"
// TODO: Remove when Spark 0.9.0 is released for real.
resolvers += "SparkStaging" at "https://repository.apache.org/content/repositories/orgapachespark-1006/"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"

libraryDependencies += "catalyst" % "hive-golden" % "4" % "test" from "http://repository-databricks.forge.cloudbees.com/snapshot/catalystGolden4.jar"

Expand Down
30 changes: 29 additions & 1 deletion src/main/scala/catalyst/analysis/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
PreInsertionCasts ::
typeCoercionRules :_*)
)

Expand Down Expand Up @@ -106,7 +107,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
case agg: AggregateExpression => return true
case _ =>
})
return false
false
}
}

Expand Down Expand Up @@ -141,4 +142,31 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
protected def containsStar(exprs: Seq[NamedExpression]): Boolean =
exprs.collect { case _: Star => true }.nonEmpty
}

/**
* Casts input data to correct data types according to table definition before inserting into
* that table.
*/
object PreInsertionCasts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
// Wait until children are resolved
case p: LogicalPlan if !p.childrenResolved => p

case p @ InsertIntoTable(table, _, child) =>
val childOutputDataTypes = child.output.map(_.dataType)
val tableOutputDataTypes = table.output.map(_.dataType)

if (childOutputDataTypes sameElements tableOutputDataTypes) {
p
} else {
// Only do the casting when child output data types differ from table output data types.
val castedChildOutput = child.output.zip(table.output).map {
case (l, r) if l.dataType != r.dataType => Alias(Cast(l, r.dataType), l.name)()
case (l, _) => l
}

p.copy(child = Project(castedChildOutput, child))
}
}
}
}
4 changes: 3 additions & 1 deletion src/main/scala/catalyst/execution/SharkInstance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ abstract class SharkInstance extends Logging {
val sc = self.sc
val strategies =
SparkEquiInnerJoin ::
PartitionPrunings ::
HiveTableScans ::
DataSinks ::
BasicOperators ::
Expand All @@ -81,7 +82,8 @@ abstract class SharkInstance extends Logging {
}

object PrepareForExecution extends RuleExecutor[SharkPlan] {
val batches = Batch("Prepare Expressions", Once, expressions.BindReferences) :: Nil
val batches =
Batch("Prepare Expressions", Once, new expressions.BindReferences[SharkPlan]) :: Nil
}

class SharkSqlQuery(sql: String) extends SharkQuery {
Expand Down
24 changes: 11 additions & 13 deletions src/main/scala/catalyst/execution/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.rdd.{HadoopRDD, UnionRDD, EmptyRDD, RDD}
* type of table storage: HeapTableReader for Shark tables in Spark's block manager,
* TachyonTableReader for tables in Tachyon, and HadoopTableReader for Hive tables in a filesystem.
*/
sealed trait TableReader {
private[catalyst] sealed trait TableReader {

def makeRDDForTable(hiveTable: HiveTable): RDD[_]

Expand All @@ -34,7 +34,7 @@ sealed trait TableReader {
* Helper class for scanning tables stored in Hadoop - e.g., to read Hive tables that reside in the
* data warehouse directory.
*/
class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf: HiveConf)
private[catalyst] class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf: HiveConf)
extends TableReader {

// Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless
Expand Down Expand Up @@ -93,11 +93,10 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf
deserializer.initialize(hconf, tableDesc.getProperties)

// Deserialize each Writable to get the row value.
iter.map { value =>
value match {
case v: Writable => deserializer.deserialize(v)
case _ => throw new RuntimeException("Failed to match " + value.toString)
}
iter.map {
case v: Writable => deserializer.deserialize(v)
case value =>
sys.error(s"Unable to deserialize non-Writable: $value of ${value.getClass.getName}")
}
}
deserializedHadoopRDD
Expand Down Expand Up @@ -130,8 +129,8 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf
val ifc = partDesc.getInputFileFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
// Get partition field info
val partSpec = partDesc.getPartSpec()
val partProps = partDesc.getProperties()
val partSpec = partDesc.getPartSpec
val partProps = partDesc.getProperties

val partColsDelimited: String = partProps.getProperty(META_TABLE_PARTITION_COLUMNS)
// Partitioning columns are delimited by "/"
Expand All @@ -156,7 +155,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf
iter.map { value =>
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)
val deserializedRow = deserializer.deserialize(value) // LazyStruct
val deserializedRow = deserializer.deserialize(value)
rowWithPartArr.update(0, deserializedRow)
rowWithPartArr.update(1, partValues)
rowWithPartArr.asInstanceOf[Object]
Expand All @@ -177,11 +176,10 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf
*/
private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
filterOpt match {
case Some(filter) => {
case Some(filter) =>
val fs = path.getFileSystem(_localHConf)
val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
filteredFiles.mkString(",")
}
case None => path.toString
}
}
Expand Down Expand Up @@ -212,7 +210,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf

}

object HadoopTableReader {
private[catalyst] object HadoopTableReader {

/**
* Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to
Expand Down
72 changes: 65 additions & 7 deletions src/main/scala/catalyst/execution/TestShark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import scala.language.implicitConversions
import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.ql.io.avro.{AvroContainerOutputFormat, AvroContainerInputFormat}
import org.apache.hadoop.hive.serde2.avro.AvroSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.serde2.RegexSerDe

import analysis._
import plans.logical.LogicalPlan
Expand Down Expand Up @@ -164,11 +168,20 @@ object TestShark extends SharkInstance {
"CREATE TABLE IF NOT EXISTS dest3 (key INT, value STRING)".cmd),
TestTable("srcpart", () => {
runSqlHive("CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
Seq("2008-04-08", "2008-04-09").foreach { ds =>
Seq("11", "12").foreach { hr =>
val partSpec = Map("ds" -> ds, "hr" -> hr)
runSqlHive(s"LOAD DATA LOCAL INPATH '${hiveDevHome.getCanonicalPath}/data/files/kv1.txt' OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')")
}
for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
runSqlHive(
s"""LOAD DATA LOCAL INPATH '${hiveDevHome.getCanonicalPath}/data/files/kv1.txt'
|OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')
""".stripMargin)
}
}),
TestTable("srcpart1", () => {
runSqlHive("CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)")
for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) {
runSqlHive(
s"""LOAD DATA LOCAL INPATH '${hiveDevHome.getCanonicalPath}/data/files/kv1.txt'
|OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr')
""".stripMargin)
}
}),
TestTable("src_thrift", () => {
Expand All @@ -194,7 +207,51 @@ object TestShark extends SharkInstance {
catalog.client.createTable(srcThrift)

runSqlHive(s"LOAD DATA LOCAL INPATH '${hiveDevHome.getCanonicalPath}/data/files/complex.seq' INTO TABLE src_thrift")
})
}),
TestTable("serdeins",
s"""CREATE TABLE serdeins (key INT, value STRING)
|ROW FORMAT SERDE '${classOf[LazySimpleSerDe].getCanonicalName}'
|WITH SERDEPROPERTIES ('field.delim'='\\t')
""".stripMargin.cmd,
"INSERT OVERWRITE TABLE serdeins SELECT * FROM src".cmd),
TestTable("sales",
s"""CREATE TABLE IF NOT EXISTS sales (key STRING, value INT)
|ROW FORMAT SERDE '${classOf[RegexSerDe].getCanonicalName}'
|WITH SERDEPROPERTIES ("input.regex" = "([^ ]*)\t([^ ]*)")
""".stripMargin.cmd,
s"LOAD DATA LOCAL INPATH '${hiveDevHome.getCanonicalPath}/data/files/sales.txt' INTO TABLE sales".cmd),
TestTable("episodes",
s"""CREATE TABLE episodes (title STRING, air_date STRING, doctor INT)
|ROW FORMAT SERDE '${classOf[AvroSerDe].getCanonicalName}'
|STORED AS
|INPUTFORMAT '${classOf[AvroContainerInputFormat].getCanonicalName}'
|OUTPUTFORMAT '${classOf[AvroContainerOutputFormat].getCanonicalName}'
|TBLPROPERTIES (
| 'avro.schema.literal'='{
| "type": "record",
| "name": "episodes",
| "namespace": "testing.hive.avro.serde",
| "fields": [
| {
| "name": "title",
| "type": "string",
| "doc": "episode title"
| },
| {
| "name": "air_date",
| "type": "string",
| "doc": "initial date"
| },
| {
| "name": "doctor",
| "type": "int",
| "doc": "main actor playing the Doctor in episode"
| }
| ]
| }'
|)
""".stripMargin.cmd,
s"LOAD DATA LOCAL INPATH '${hiveDevHome.getCanonicalPath}/data/files/episodes.avro' INTO TABLE episodes".cmd)
)

hiveQTestUtilTables.foreach(registerTestTable)
Expand All @@ -203,11 +260,12 @@ object TestShark extends SharkInstance {

def loadTestTable(name: String) {
if (!(loadedTables contains name)) {
// Marks the table as loaded first to prevent infite mutually recursive table loading.
loadedTables += name
logger.info(s"Loading test table $name")
val createCmds =
testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name"))
createCmds.foreach(_())
loadedTables += name
}
}

Expand Down
Loading

0 comments on commit 5dab0bc

Please sign in to comment.