Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark diditaxi #4

Merged
merged 2 commits into from
Mar 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.util.ShutdownHookManager

/**
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
* `HiveThriftServer2` thrift server.
*/
object HiveThriftServer2 extends Logging {
val SHUTDOWN_HOOK_PRIORITY: Int = 30
var LOG = LogFactory.getLog(classOf[HiveServer2])

/**
Expand All @@ -57,12 +60,14 @@ object HiveThriftServer2 extends Logging {
logInfo("Starting SparkContext")
SparkSQLEnv.init()

Runtime.getRuntime.addShutdownHook(
new Thread() {
// Clean up after we exit. Use higher priority than FileSystem.
assert(SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
ShutdownHookManager.get().addShutdownHook(
new Runnable {
override def run() {
SparkSQLEnv.stop()
}
}
}, SHUTDOWN_HOOK_PRIORITY
)

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,25 @@ import jline.{ConsoleReader, History}
import org.apache.commons.lang.StringUtils
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor}
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException
import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils, LogUtils}
import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.processors.{SetProcessor, CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.processors.{SetProcessor, CommandProcessor}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.thrift.transport.TSocket

import org.apache.spark.Logging
import org.apache.spark.sql.hive.HiveShim

private[hive] object SparkSQLCLIDriver {
val SHUTDOWN_HOOK_PRIORITY: Int = 30
private var prompt = "spark-sql"
private var continuedPrompt = "".padTo(prompt.length, ' ')
private var transport:TSocket = _
private var transport: TSocket = _

installSignalHandler()

Expand Down Expand Up @@ -92,23 +93,27 @@ private[hive] object SparkSQLCLIDriver {

// Set all properties specified via command line.
val conf: HiveConf = sessionState.getConf
sessionState.cmdProperties.entrySet().foreach { item: java.util.Map.Entry[Object, Object] =>
conf.set(item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
sessionState.getOverriddenConfigurations.put(
item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
sessionState.cmdProperties.entrySet().foreach {
item: java.util.Map.Entry[Object, Object] =>
conf.set(item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
sessionState.getOverriddenConfigurations.put(
item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
}

SessionState.start(sessionState)

// Clean up after we exit
Runtime.getRuntime.addShutdownHook(
new Thread() {
// Clean up after we exit. Use higher priority than FileSystem.
assert(SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
ShutdownHookManager.get().addShutdownHook(
new Runnable {
override def run() {
SparkSQLEnv.stop()
}
}
},
SHUTDOWN_HOOK_PRIORITY
)


// "-h" option has been passed, so connect to Hive thrift server.
if (sessionState.getHost != null) {
sessionState.connect()
Expand Down Expand Up @@ -175,12 +180,12 @@ private[hive] object SparkSQLCLIDriver {
reader.setHistory(new History(new File(historyFile)))
} else {
System.err.println("WARNING: Directory for Hive history file: " + historyDirectory +
" does not exist. History will not be available during this session.")
" does not exist. History will not be available during this session.")
}
} catch {
case e: Exception =>
System.err.println("WARNING: Encountered an error while trying to initialize Hive's " +
"history file. History will not be available during this session.")
"history file. History will not be available during this session.")
System.err.println(e.getMessage)
}

Expand Down Expand Up @@ -268,13 +273,13 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {

driver.init()
val out = sessionState.out
val start:Long = System.currentTimeMillis()
val start: Long = System.currentTimeMillis()
if (sessionState.getIsVerbose) {
out.println(cmd)
}
val rc = driver.run(cmd)
val end = System.currentTimeMillis()
val timeTaken:Double = (end - start) / 1000.0
val timeTaken: Double = (end - start) / 1000.0

ret = rc.getResponseCode
if (ret != 0) {
Expand All @@ -287,22 +292,24 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {

if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER)) {
// Print the column names.
Option(driver.getSchema.getFieldSchemas).map { fields =>
out.println(fields.map(_.getName).mkString("\t"))
Option(driver.getSchema.getFieldSchemas).map {
fields =>
out.println(fields.map(_.getName).mkString("\t"))
}
}

var counter = 0
try {
while (!out.checkError() && driver.getResults(res)) {
res.foreach{ l =>
counter += 1
out.println(l)
res.foreach {
l =>
counter += 1
out.println(l)
}
res.clear()
}
} catch {
case e:IOException =>
case e: IOException =>
console.printError(
s"""Failed with exception ${e.getClass.getName}: ${e.getMessage}
|${org.apache.hadoop.util.StringUtils.stringifyException(e)}
Expand All @@ -319,7 +326,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
if (counter != 0) {
responseMsg += s", Fetched $counter row(s)"
}
console.printInfo(responseMsg , null)
console.printInfo(responseMsg, null)
// Destroy the driver to release all the locks.
driver.destroy()
} else {
Expand Down
87 changes: 73 additions & 14 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
Expand Down Expand Up @@ -116,7 +116,7 @@ class HadoopTableReader(
val hconf = broadcastedHiveConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}

deserializedHadoopRDD
Expand All @@ -142,7 +142,46 @@ class HadoopTableReader(
partitionToDeserializer: Map[HivePartition,
Class[_ <: Deserializer]],
filterOpt: Option[PathFilter]): RDD[Row] = {
val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) =>

//SPARK-5068:get FileStatus and do the filtering locally when the path is not exists
def verifyPartitionPath(
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
Map[HivePartition, Class[_ <: Deserializer]] = {
if (!sc.getConf("spark.sql.hive.verifyPartitionPath", "true").toBoolean) {
partitionToDeserializer
} else {
var existPathSet = collection.mutable.Set[String]()
var pathPatternSet = collection.mutable.Set[String]()
partitionToDeserializer.filter {
case (partition, partDeserializer) =>
def updateExistPathSetByPathPattern(pathPatternStr: String) {
val pathPattern = new Path(pathPatternStr)
val fs = pathPattern.getFileSystem(sc.hiveconf)
val matches = fs.globStatus(pathPattern)
matches.map(fileStatus => existPathSet += fileStatus.getPath.toString)
}
// convert /demo/data/year/month/day to /demo/data/**/**/**/
def getPathPatternByPath(parNum: Int, tempPath: Path): String = {
var path = tempPath
for (i <- (1 to parNum)) path = path.getParent
val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/")
path.toString + tails
}

val partPath = HiveShim.getDataLocationPath(partition)
val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size();
var pathPatternStr = getPathPatternByPath(partNum, partPath)
if (!pathPatternSet.contains(pathPatternStr)) {
pathPatternSet += pathPatternStr
updateExistPathSetByPathPattern(pathPatternStr)
}
existPathSet.contains(partPath.toString)
}
}
}

val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer)
.map { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = HiveShim.getDataLocationPath(partition)
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
Expand Down Expand Up @@ -189,10 +228,12 @@ class HadoopTableReader(
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)

// get the table deserializer
val tableSerDe = tableDesc.getDeserializerClass.newInstance()
tableSerDe.initialize(hconf, tableDesc.getProperties)
// fill the non partition key attributes
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow)
}
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,
mutableRow, tableSerDe) }
}.toSeq

// Even if we don't use any partitions, we still need an empty RDD
Expand Down Expand Up @@ -261,25 +302,39 @@ private[hive] object HadoopTableReader extends HiveInspectors {
* Transform all given raw `Writable`s into `Row`s.
*
* @param iterator Iterator of all `Writable`s to be transformed
* @param deserializer The `Deserializer` associated with the input `Writable`
* @param rawDeser The `Deserializer` associated with the input `Writable`
* @param nonPartitionKeyAttrs Attributes that should be filled together with their corresponding
* positions in the output schema
* @param mutableRow A reusable `MutableRow` that should be filled
* @param tableDeser Table Deserializer
* @return An `Iterator[Row]` transformed from `iterator`
*/
def fillObject(
iterator: Iterator[Writable],
deserializer: Deserializer,
rawDeser: Deserializer,
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[Row] = {
mutableRow: MutableRow,
tableDeser: Deserializer): Iterator[Row] = {

val soi = if (rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
rawDeser.getObjectInspector.asInstanceOf[StructObjectInspector]
} else {
/* HiveShim.getConvertedOI(
rawDeser.getObjectInspector,
tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]*/
ObjectInspectorConverters.getConvertedOI(rawDeser.getObjectInspector,
tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]

}

val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
}.unzip

// Builds specific unwrappers ahead of time according to object inspector types to avoid pattern
// matching and branching costs per row.
/**
* Builds specific unwrappers ahead of time according to object inspector
* types to avoid pattern matching and branching costs per row.
*/
val unwrappers: Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map {
_.getFieldObjectInspector match {
case oi: BooleanObjectInspector =>
Expand Down Expand Up @@ -307,7 +362,8 @@ private[hive] object HadoopTableReader extends HiveInspectors {
row.update(ordinal, oi.getPrimitiveJavaObject(value).clone())
case oi: DateObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) =>
row.update(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
// row.update(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
row.update(ordinal, oi.getPrimitiveJavaObject(value))
case oi: BinaryObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) =>
row.update(ordinal, oi.getPrimitiveJavaObject(value))
Expand All @@ -316,9 +372,11 @@ private[hive] object HadoopTableReader extends HiveInspectors {
}
}

val converter = ObjectInspectorConverters.getConverter(rawDeser.getObjectInspector, soi)

// Map each tuple to a row object
iterator.map { value =>
val raw = deserializer.deserialize(value)
val raw = converter.convert(rawDeser.deserialize(value))
var i = 0
while (i < fieldRefs.length) {
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
Expand All @@ -334,3 +392,4 @@ private[hive] object HadoopTableReader extends HiveInspectors {
}
}
}