From c464b262b0fc5cec014f39c9f55b319d7e18951d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Sep 2014 00:06:25 -0700 Subject: [PATCH 1/5] Refactors dynamic partitioning support Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala --- .../{ => sql/hive}/SparkHadoopWriter.scala | 197 ++++++-------- .../hive/execution/InsertIntoHiveTable.scala | 254 ++++++------------ 2 files changed, 172 insertions(+), 279 deletions(-) rename sql/hive/src/main/scala/org/apache/spark/{ => sql/hive}/SparkHadoopWriter.scala (50%) diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala similarity index 50% rename from sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala index bbb66ae6005bd..6e07b51f49230 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala @@ -21,20 +21,23 @@ import java.io.IOException import java.text.NumberFormat import java.util.Date +import scala.collection.mutable + import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.plan.FileSinkDesc -import org.apache.hadoop.mapred._ import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred._ +import org.apache.spark.sql.Row import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} /** * Internal helper class that saves an RDD using a Hive OutputFormat. * It is based on [[SparkHadoopWriter]]. */ -private[hive] class SparkHiveHadoopWriter( +private[hive] class SparkHiveWriterContainer( @transient jobConf: JobConf, fileSinkConf: FileSinkDesc) extends Logging @@ -42,7 +45,7 @@ private[hive] class SparkHiveHadoopWriter( with Serializable { private val now = new Date() - private val conf = new SerializableWritable(jobConf) + protected val conf = new SerializableWritable(jobConf) private var jobID = 0 private var splitID = 0 @@ -51,152 +54,75 @@ private[hive] class SparkHiveHadoopWriter( private var taID: SerializableWritable[TaskAttemptID] = null @transient private var writer: FileSinkOperator.RecordWriter = null - @transient private var format: HiveOutputFormat[AnyRef, Writable] = null - @transient private var committer: OutputCommitter = null - @transient private var jobContext: JobContext = null - @transient private var taskContext: TaskAttemptContext = null + @transient private lazy val committer = conf.value.getOutputCommitter + @transient private lazy val jobContext = newJobContext(conf.value, jID.value) + @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value) + @transient private lazy val outputFormat = + conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]] - def preSetup() { + def driverSideSetup() { setIDs(0, 0, 0) setConfParams() - - val jCtxt = getJobContext() - getOutputCommitter().setupJob(jCtxt) + committer.setupJob(jobContext) } - - def setup(jobid: Int, splitid: Int, attemptid: Int) { - setIDs(jobid, splitid, attemptid) + def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) { + setIDs(jobId, splitId, attemptId) setConfParams() - } - - def open() { - val numfmt = NumberFormat.getInstance() - numfmt.setMinimumIntegerDigits(5) - numfmt.setGroupingUsed(false) - - val extension = Utilities.getFileExtension( - conf.value, - fileSinkConf.getCompressed, - getOutputFormat()) - - val outputName = "part-" + numfmt.format(splitID) + extension - val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName) - - getOutputCommitter().setupTask(getTaskContext()) - writer = HiveFileFormatUtils.getHiveRecordWriter( - conf.value, - fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], - fileSinkConf, - path, - null) + committer.setupTask(taskContext) } /** - * create an HiveRecordWriter. imitate the above function open() - * @param dynamicPartPath the relative path for dynamic partition - * - * since this function is used to create different writer for - * different dynamic partition.So we need a parameter dynamicPartPath - * and use it we can calculate a new path and pass the new path to - * the function HiveFileFormatUtils.getHiveRecordWriter + * Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer + * for writing data to a dynamic partition. */ - def open(dynamicPartPath: String) { - val numfmt = NumberFormat.getInstance() - numfmt.setMinimumIntegerDigits(5) - numfmt.setGroupingUsed(false) - - val extension = Utilities.getFileExtension( - conf.value, - fileSinkConf.getCompressed, - getOutputFormat()) - - val outputName = "part-" + numfmt.format(splitID) + extension - val outputPath: Path = FileOutputFormat.getOutputPath(conf.value) - if (outputPath == null) { - throw new IOException("Undefined job output-path") - } - val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) // remove "/" - val path = new Path(workPath, outputName) - getOutputCommitter().setupTask(getTaskContext()) + def open() { writer = HiveFileFormatUtils.getHiveRecordWriter( conf.value, fileSinkConf.getTableInfo, conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], fileSinkConf, - path, + FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), Reporter.NULL) } - def write(value: Writable) { - if (writer != null) { - writer.write(value) - } else { - throw new IOException("Writer is null, open() has not been called") - } + protected def getOutputName: String = { + val numberFormat = NumberFormat.getInstance() + numberFormat.setMinimumIntegerDigits(5) + numberFormat.setGroupingUsed(false) + val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat) + "part-" + numberFormat.format(splitID) + extension } + def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer + def close() { // Seems the boolean value passed into close does not matter. writer.close(false) } def commit() { - val taCtxt = getTaskContext() - val cmtr = getOutputCommitter() - if (cmtr.needsTaskCommit(taCtxt)) { + if (committer.needsTaskCommit(taskContext)) { try { - cmtr.commitTask(taCtxt) + committer.commitTask(taskContext) logInfo (taID + ": Committed") } catch { case e: IOException => logError("Error committing the output of task: " + taID.value, e) - cmtr.abortTask(taCtxt) + committer.abortTask(taskContext) throw e } } else { - logWarning ("No need to commit output of task: " + taID.value) + logInfo("No need to commit output of task: " + taID.value) } } def commitJob() { - // always ? Or if cmtr.needsTaskCommit ? - val cmtr = getOutputCommitter() - cmtr.commitJob(getJobContext()) + committer.commitJob(jobContext) } // ********* Private Functions ********* - private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = { - if (format == null) { - format = conf.value.getOutputFormat() - .asInstanceOf[HiveOutputFormat[AnyRef,Writable]] - } - format - } - - private def getOutputCommitter(): OutputCommitter = { - if (committer == null) { - committer = conf.value.getOutputCommitter - } - committer - } - - private def getJobContext(): JobContext = { - if (jobContext == null) { - jobContext = newJobContext(conf.value, jID.value) - } - jobContext - } - - private def getTaskContext(): TaskAttemptContext = { - if (taskContext == null) { - taskContext = newTaskAttemptContext(conf.value, taID.value) - } - taskContext - } - private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { jobID = jobId splitID = splitId @@ -216,7 +142,7 @@ private[hive] class SparkHiveHadoopWriter( } } -private[hive] object SparkHiveHadoopWriter { +private[hive] object SparkHiveWriterContainer { def createPathFromString(path: String, conf: JobConf): Path = { if (path == null) { throw new IllegalArgumentException("Output path is null") @@ -226,6 +152,59 @@ private[hive] object SparkHiveHadoopWriter { if (outputPath == null || fs == null) { throw new IllegalArgumentException("Incorrectly formatted output path") } - outputPath.makeQualified(fs) + outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + } +} + +private[spark] class SparkHiveDynamicPartitionWriterContainer( + @transient jobConf: JobConf, + fileSinkConf: FileSinkDesc, + dynamicPartColNames: Array[String], + defaultPartName: String) + extends SparkHiveWriterContainer(jobConf, fileSinkConf) { + + @transient var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ + + override def open(): Unit = { + writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter] + } + + override def close(): Unit = { + writers.values.foreach(_.close(false)) + } + + override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = { + val dynamicPartPath = dynamicPartColNames + .zip(row.takeRight(dynamicPartColNames.length)) + .map { case (col, rawVal) => + val string = String.valueOf(rawVal) + s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}" + } + .mkString + + val path = { + val outputPath = FileOutputFormat.getOutputPath(conf.value) + assert(outputPath != null, "Undefined job output-path") + val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) + new Path(workPath, getOutputName) + } + + def newWriter = { + val newFileSinkDesc = new FileSinkDesc( + fileSinkConf.getDirName + dynamicPartPath, + fileSinkConf.getTableInfo, + fileSinkConf.getCompressed) + newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec) + newFileSinkDesc.setCompressType(fileSinkConf.getCompressType) + HiveFileFormatUtils.getHiveRecordWriter( + conf.value, + fileSinkConf.getTableInfo, + conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], + newFileSinkDesc, + path, + Reporter.NULL) + } + + writers.getOrElseUpdate(dynamicPartPath, newWriter) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 12b4c3eec6ab0..c88ae70063b4b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -18,30 +18,29 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConversions._ - -import java.util.{HashMap => JHashMap} +import scala.collection.mutable import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.MetaStoreUtils -import org.apache.hadoop.hive.ql.Context -import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector -import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} -import org.apache.spark.{SerializableWritable, SparkException, TaskContext} +import org.apache.spark.SparkContext._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} -import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} -import org.apache.hadoop.hive.conf.HiveConf +import org.apache.spark.sql.hive._ +import org.apache.spark.{SerializableWritable, SparkException, TaskContext} /** * :: DeveloperApi :: @@ -102,30 +101,23 @@ case class InsertIntoHiveTable( obj } - /** - * since we should get directory of dynamic partition from upstream RDD - * reference the code "serializer.serialize(outputData, standardOI) -> dynamicPartPath" - * So The type of the elment in RDD is (Writable, String) - */ def saveAsHiveFile( - rdd: RDD[(Writable, String)], + rdd: RDD[Row], valueClass: Class[_], fileSinkConf: FileSinkDesc, conf: SerializableWritable[JobConf], isCompressed: Boolean, - dynamicPartNum: Int) { - if (valueClass == null) { - throw new SparkException("Output value class not set") - } + writerContainer: SparkHiveWriterContainer) { + assert(valueClass != null, "Output value class not set") conf.value.setOutputValueClass(valueClass) - if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { - throw new SparkException("Output format class not set") - } + + assert(fileSinkConf.getTableInfo.getOutputFileFormatClassName != null) // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) - conf.value.set("mapred.output.format.class", - fileSinkConf.getTableInfo.getOutputFileFormatClassName) + conf.value.set( + "mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties @@ -139,101 +131,44 @@ case class InsertIntoHiveTable( FileOutputFormat.setOutputPath( conf.value, - SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf.value)) + SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - var writer: SparkHiveHadoopWriter = null - // Map restore writesr for Dynamic Partition - var writerMap: scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] = null - if (dynamicPartNum == 0) { - writer = new SparkHiveHadoopWriter(conf.value, fileSinkConf) - writer.preSetup() - sc.sparkContext.runJob(rdd, writeToFile _) - writer.commitJob() - } else { - writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] - sc.sparkContext.runJob(rdd, writeToFile _) - for ((k,v) <- writerMap) { - v.commitJob() - } - writerMap.clear() - } - def writeToFile(context: TaskContext, iter: Iterator[(Writable, String)]) { + writerContainer.driverSideSetup() + sc.sparkContext.runJob(rdd, writeToFile _) + writerContainer.commitJob() + + // Note that this function is executed on executor side + def writeToFile(context: TaskContext, iterator: Iterator[Row]) { + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val outputData = new Array[Any](fieldOIs.length) + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt + writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber) + writerContainer.open() - if (dynamicPartNum == 0) { // for All static partition - writer.setup(context.stageId, context.partitionId, attemptNumber) - writer.open() - // writer for Dynamic Partition - while(iter.hasNext) { - val record = iter.next() - writer.write(record._1) - } - writer.close() - writer.commit() - } else { // if there is dynamic Partition - while(iter.hasNext) { - val record = iter.next() - val location = fileSinkConf.getDirName - val partLocation = location + record._2 // different writer related with different file - def createNewWriter(): SparkHiveHadoopWriter = { - val tempWriter = new SparkHiveHadoopWriter(conf.value, - new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) - tempWriter.setup(context.stageId, context.partitionId, attemptNumber) - tempWriter.open(record._2) - writerMap += (record._2 -> tempWriter) - tempWriter - } - val writer2 = writerMap.getOrElseUpdate(record._2, createNewWriter) - writer2.write(record._1) - } - for ((k,v) <- writerMap) { - v.close() - v.commit() + iterator.foreach { row => + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = wrap(row(i), fieldOIs(i)) + i += 1 } - } - } - } - /** - * Returns the Dynamic partition directory for the given row. - * @param partCols an array containing the string names of the partition columns - * - * we get the last dynamicPartNum elements from partCols and - * last dynamicPartNum elements from the current row, - * then we can construct a String for dynamic partition directory - * For example: - * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ... - * return: /part1=val1/part2=val2 - * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ... - * return: /part2=val2 - * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ... - * return: /part2=val2/part3=val3 - */ - private def getDynamicPartDir(partCols: Array[String], - row: Row, - dynamicPartNum: Int, - defaultPartName: String): String = { - assert(dynamicPartNum > 0) - // TODO needs optimization - partCols - .takeRight(dynamicPartNum) - .zip(row.takeRight(dynamicPartNum)) - .map { case (c, v) => s"/$c=${handleNull(v, defaultPartName)}" } - .mkString - } + val writer = writerContainer.getLocalFileWriter(row) + writer.write(serializer.serialize(outputData, standardOI)) + } - /** - * Returns `rowVal` as a String. - * If `rowVal` is null or equal to "", returns the default partition name. - */ - private def handleNull(rowVal: Any, defaultPartName: String): String = { - if (rowVal == null ||String.valueOf(rowVal).length == 0) { - defaultPartName - } else { - String.valueOf(rowVal) + writerContainer.close() + writerContainer.commit() } } @@ -247,9 +182,6 @@ case class InsertIntoHiveTable( * Note: this is run once and then kept to avoid double insertions. */ private lazy val result: RDD[Row] = { - val childRdd = child.execute() - assert(childRdd != null) - // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc @@ -257,80 +189,62 @@ case class InsertIntoHiveTable( val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val numDynamicPartitions = partition.values.filter(_.isEmpty).size - val numStaticPartitions = partition.values.filter(_.isDefined).size + val numDynamicPartitions = partition.values.count(_.isEmpty) + val numStaticPartitions = partition.values.count(_.nonEmpty) val partitionSpec = partition.map { - case (key, Some(value)) => - key -> value - case (key, None) => - key -> "" + case (key, Some(value)) => key -> value + case (key, None) => key -> "" } - val jobConf = new JobConf(sc.hiveconf) - val jobConfSer = new SerializableWritable(jobConf) - // check if the partition spec is valid + // All partition column names in the format of "//..." + val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") + val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull + + // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { + // Report error if dynamic partitioning is not enabled if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()) + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) } + + // Report error if dynamic partition strict mode is on but no static partition is found if (numStaticPartitions == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg()) + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) } - // check if static partition appear after dynamic partitions - var tmpNumStaticPartitions = numStaticPartitions - for ((k,v) <- partitionSpec) { - if (partitionSpec(k) == "") { - if (tmpNumStaticPartitions > 0) { // found a DP, but there exists ST as subpartition - throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg()) - } - } else { - tmpNumStaticPartitions -= 1 - } + + // Report error if any static partition appears after a dynamic partition + val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) + isDynamic.init.zip(isDynamic.tail).find(_ == (true, false)).foreach { _ => + throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) } } - val rdd = childRdd.mapPartitions { iter => - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] + val jobConf = new JobConf(sc.hiveconf) + val jobConfSer = new SerializableWritable(jobConf) - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val outputData = new Array[Any](fieldOIs.length) - val defaultPartName = jobConfSer.value.get( - HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, - HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultVal) - - val partitionColumns = fileSinkConf.getTableInfo. - getProperties.getProperty("partition_columns") // a String like "colname1/colname2" - val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull - - iter.map { row => - var dynamicPartPath: String = null - if (numDynamicPartitions > 0) { - dynamicPartPath = getDynamicPartDir(partitionColumnNames, row, - numDynamicPartitions, defaultPartName) - } - var i = 0 - while (i < fieldOIs.length) { - // Casts Strings to HiveVarchars when necessary. - outputData(i) = wrap(row(i), fieldOIs(i)) - i += 1 - } - // pass the dynamicPartPath to downStream RDD - serializer.serialize(outputData, standardOI) -> dynamicPartPath - } + val defaultPartName = jobConf.get( + ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) + val writerContainer = if (numDynamicPartitions > 0) { + new SparkHiveDynamicPartitionWriterContainer( + jobConf, + fileSinkConf, + partitionColumnNames.takeRight(numDynamicPartitions), + defaultPartName) + } else { + new SparkHiveWriterContainer(jobConf, fileSinkConf) } + + val isCompressed = jobConf.getBoolean( + ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) + saveAsHiveFile( - rdd, + child.execute(), outputClass, fileSinkConf, jobConfSer, - sc.hiveconf.getBoolean("hive.exec.compress.output", false), - numDynamicPartitions) + isCompressed, + writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. From 528e84c44ce885accbb6a953fa604e5ad664032d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Sep 2014 18:32:12 -0700 Subject: [PATCH 2/5] Fixes typo in test name, regenerated golden answer files --- ...tition-0-be33aaa7253c8f248ff3921cd7dae340} | 0 ...tition-1-640552dd462707563fd255a713f83b41} | 0 ...tition-2-36456c9d0d2e3ef72ab5ba9ba48e5493} | 0 ...tition-3-b7f7fa7ebf666f4fee27e149d8c6961f} | 0 ...tition-4-8bdb71ad8cb3cc3026043def2525de3a} | 0 ...tition-5-c630dce438f3792e7fb0f523fbbb3e1e} | 0 ...tition-6-7abc9ec8a36cdc5e89e955265a7fd7cf} | 0 ...tition-7-be33aaa7253c8f248ff3921cd7dae340} | 0 .../sql/hive/execution/HiveQuerySuite.scala | 34 ++++++++++++------- 9 files changed, 22 insertions(+), 12 deletions(-) rename sql/hive/src/test/resources/golden/{dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef => dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 => dynamic_partition-1-640552dd462707563fd255a713f83b41} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-2-16367c381d4b189b3640c92511244bfe => dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 => dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 => dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee => dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed => dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef => dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340} (100%) diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef b/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340 similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef rename to sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 b/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41 similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 rename to sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe b/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe rename to sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 b/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 rename to sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 b/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 rename to sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee b/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee rename to sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed b/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed rename to sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef b/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340 similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef rename to sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 31e7263f0edde..21f6f50f5eb15 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -255,18 +255,6 @@ class HiveQuerySuite extends HiveComparisonTest { |insert overwrite table src_lv2 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX """.stripMargin) - createQueryTest("dynamic_partiton", - """ - |drop table IF EXISTS dynamic_part_table; - |create table dynamic_part_table(intcol int) partitioned by (partcol1 int, partcol2 int); - |set hive.exec.dynamic.partition.mode=nonstrict; - |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, 1, 1 from src where key=150; - |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, NULL, 1 from src where key=150; - |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, 1, NULL from src where key=150; - |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, NULL, NULL from src where key=150; - |drop table IF EXISTS dynamic_part_table; - """.stripMargin) - createQueryTest("lateral view5", "FROM src SELECT explode(array(key+3, key+4))") @@ -534,6 +522,28 @@ class HiveQuerySuite extends HiveComparisonTest { case class LogEntry(filename: String, message: String) case class LogFile(name: String) + createQueryTest("dynamic_partition", + """ + |DROP TABLE IF EXISTS dynamic_part_table; + |CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT); + | + |SET hive.exec.dynamic.partition.mode=nonstrict; + | + |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, 1, 1 FROM src WHERE key=150; + | + |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, NULL, 1 FROM src WHERE key=150; + | + |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, 1, NULL FROM src WHERE key=150; + | + |INSERT INTO TABLe dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, NULL, NULL FROM src WHERE key=150; + | + |DROP TABLE IF EXISTS dynamic_part_table; + """.stripMargin) + test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") { sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs") sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles") From fae9eff4f8d68e2ba0816d859fb7f4171c56b995 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Sep 2014 22:13:36 -0700 Subject: [PATCH 3/5] Refactors InsertIntoHiveTable to a Command --- .../sql/hive/execution/InsertIntoHiveTable.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c88ae70063b4b..8e9f7e5aa7374 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConversions._ -import scala.collection.mutable import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.conf.HiveConf @@ -31,14 +30,12 @@ import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector} -import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} -import org.apache.spark.SparkContext._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} +import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode} import org.apache.spark.sql.hive._ import org.apache.spark.{SerializableWritable, SparkException, TaskContext} @@ -52,7 +49,7 @@ case class InsertIntoHiveTable( child: SparkPlan, overwrite: Boolean) (@transient sc: HiveContext) - extends UnaryNode { + extends UnaryNode with Command { @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass @transient private lazy val hiveContext = new Context(sc.hiveconf) @@ -172,8 +169,6 @@ case class InsertIntoHiveTable( } } - override def execute() = result - /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the @@ -181,7 +176,7 @@ case class InsertIntoHiveTable( * * Note: this is run once and then kept to avoid double insertions. */ - private lazy val result: RDD[Row] = { + override protected[sql] lazy val sideEffectResult: Seq[Row] = { // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc @@ -293,6 +288,6 @@ case class InsertIntoHiveTable( // however for now we return an empty list to simplify compatibility checks with hive, which // does not return anything for insert operations. // TODO: implement hive compatibility as rules. - sc.sparkContext.makeRDD(Nil, 1) + Seq.empty[Row] } } From 50045422630711cf92768b30e305d0dde4ee04c8 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Sep 2014 22:41:56 -0700 Subject: [PATCH 4/5] Minor refactoring --- .../spark/sql/hive/SparkHadoopWriter.scala | 9 +++-- .../hive/execution/InsertIntoHiveTable.scala | 33 +++++-------------- 2 files changed, 15 insertions(+), 27 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala index 6e07b51f49230..5fdca4fcdabe5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala @@ -24,6 +24,7 @@ import java.util.Date import scala.collection.mutable import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.plan.FileSinkDesc @@ -159,11 +160,13 @@ private[hive] object SparkHiveWriterContainer { private[spark] class SparkHiveDynamicPartitionWriterContainer( @transient jobConf: JobConf, fileSinkConf: FileSinkDesc, - dynamicPartColNames: Array[String], - defaultPartName: String) + dynamicPartColNames: Array[String]) extends SparkHiveWriterContainer(jobConf, fileSinkConf) { - @transient var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ + private val defaultPartName = jobConf.get( + ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) + + @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ override def open(): Unit = { writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 8e9f7e5aa7374..ea884dc4ffa24 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -103,17 +103,16 @@ case class InsertIntoHiveTable( valueClass: Class[_], fileSinkConf: FileSinkDesc, conf: SerializableWritable[JobConf], - isCompressed: Boolean, writerContainer: SparkHiveWriterContainer) { assert(valueClass != null, "Output value class not set") conf.value.setOutputValueClass(valueClass) - assert(fileSinkConf.getTableInfo.getOutputFileFormatClassName != null) - // Doesn't work in Scala 2.9 due to what may be a generics bug - // TODO: Should we uncomment this for Scala 2.10? - // conf.setOutputFormat(outputFormatClass) - conf.value.set( - "mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName + assert(outputFileFormatClassName != null, "Output format class not set") + conf.value.set("mapred.output.format.class", outputFileFormatClassName) + + val isCompressed = conf.value.getBoolean( + ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", @@ -218,28 +217,14 @@ case class InsertIntoHiveTable( val jobConf = new JobConf(sc.hiveconf) val jobConfSer = new SerializableWritable(jobConf) - val defaultPartName = jobConf.get( - ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) val writerContainer = if (numDynamicPartitions > 0) { - new SparkHiveDynamicPartitionWriterContainer( - jobConf, - fileSinkConf, - partitionColumnNames.takeRight(numDynamicPartitions), - defaultPartName) + val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) + new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames) } else { new SparkHiveWriterContainer(jobConf, fileSinkConf) } - val isCompressed = jobConf.getBoolean( - ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) - - saveAsHiveFile( - child.execute(), - outputClass, - fileSinkConf, - jobConfSer, - isCompressed, - writerContainer) + saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. From 1093c2074ddd81b07c28bfb5369ff0b39be8f424 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 17 Sep 2014 00:28:57 -0700 Subject: [PATCH 5/5] Adds more tests --- .../sql/hive/execution/HiveQuerySuite.scala | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 21f6f50f5eb15..cb1e56846c49a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution import scala.util.Try +import org.apache.spark.SparkException import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ @@ -334,7 +335,7 @@ class HiveQuerySuite extends HiveComparisonTest { def isExplanation(result: SchemaRDD) = { val explanation = result.select('plan).collect().map { case Row(plan: String) => plan } - explanation.exists(_ == "== Physical Plan ==") + explanation.contains("== Physical Plan ==") } test("SPARK-1704: Explain commands as a SchemaRDD") { @@ -544,6 +545,30 @@ class HiveQuerySuite extends HiveComparisonTest { |DROP TABLE IF EXISTS dynamic_part_table; """.stripMargin) + test("Partition spec validation") { + sql("DROP TABLE IF EXISTS dp_test") + sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)") + sql("SET hive.exec.dynamic.partition.mode=strict") + + // Should throw when using strict dynamic partition mode without any static partition + intercept[SparkException] { + sql( + """INSERT INTO TABLE dp_test PARTITION(dp) + |SELECT key, value, key % 5 FROM src + """.stripMargin) + } + + sql("SET hive.exec.dynamic.partition.mode=nonstrict") + + // Should throw when a static partition appears after a dynamic partition + intercept[SparkException] { + sql( + """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1) + |SELECT key, value, key % 5 FROM src + """.stripMargin) + } + } + test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") { sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs") sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles") @@ -601,27 +626,27 @@ class HiveQuerySuite extends HiveComparisonTest { assert(sql("SET").collect().size == 0) assertResult(Set(testKey -> testVal)) { - collectResults(hql(s"SET $testKey=$testVal")) + collectResults(sql(s"SET $testKey=$testVal")) } assert(hiveconf.get(testKey, "") == testVal) assertResult(Set(testKey -> testVal)) { - collectResults(hql("SET")) + collectResults(sql("SET")) } sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { - collectResults(hql("SET")) + collectResults(sql("SET")) } // "set key" assertResult(Set(testKey -> testVal)) { - collectResults(hql(s"SET $testKey")) + collectResults(sql(s"SET $testKey")) } assertResult(Set(nonexistentKey -> "")) { - collectResults(hql(s"SET $nonexistentKey")) + collectResults(sql(s"SET $nonexistentKey")) } // Assert that sql() should have the same effects as sql() by repeating the above using sql().