From 27540ba5596f8018124e91265dc8709df13363a3 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 21 Jul 2014 13:33:33 +0800 Subject: [PATCH] Fix the TableScan Bug while partition serde differs --- .../apache/spark/sql/hive/TableReader.scala | 105 ++++++++++++------ .../sql/hive/execution/HiveTableScan.scala | 90 ++------------- 2 files changed, 81 insertions(+), 114 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index c3942578d6b5a..09e826c506613 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -24,6 +24,8 @@ import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde2.Deserializer +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector + import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} @@ -31,13 +33,16 @@ import org.apache.spark.SerializableWritable import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Row, GenericMutableRow, Literal, Cast} +import org.apache.spark.sql.catalyst.types.DataType + /** * A trait for subclasses that handle table scans. */ private[hive] sealed trait TableReader { - def makeRDDForTable(hiveTable: HiveTable): RDD[_] + def makeRDDForTable(hiveTable: HiveTable): RDD[Row] - def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_] + def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row] } @@ -46,7 +51,8 @@ private[hive] sealed trait TableReader { * data warehouse directory. */ private[hive] -class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveContext) +class HadoopTableReader(@transient attributes: Seq[Attribute], + @transient relation: MetastoreRelation, @transient sc: HiveContext) extends TableReader { // Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless @@ -63,10 +69,10 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon def hiveConf = _broadcastedHiveConf.value.value - override def makeRDDForTable(hiveTable: HiveTable): RDD[_] = + override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] = makeRDDForTable( hiveTable, - _tableDesc.getDeserializerClass.asInstanceOf[Class[Deserializer]], + relation.tableDesc.getDeserializerClass.asInstanceOf[Class[Deserializer]], filterOpt = None) /** @@ -81,14 +87,14 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon def makeRDDForTable( hiveTable: HiveTable, deserializerClass: Class[_ <: Deserializer], - filterOpt: Option[PathFilter]): RDD[_] = { + filterOpt: Option[PathFilter]): RDD[Row] = { assert(!hiveTable.isPartitioned, """makeRDDForTable() cannot be called on a partitioned table, since input formats may differ across partitions. Use makeRDDForTablePartitions() instead.""") // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. - val tableDesc = _tableDesc + val tableDesc = relation.tableDesc val broadcastedHiveConf = _broadcastedHiveConf val tablePath = hiveTable.getPath @@ -99,23 +105,20 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + val attrsWithIndex = attributes.zipWithIndex + val mutableRow = new GenericMutableRow(attrsWithIndex.length) val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => val hconf = broadcastedHiveConf.value.value val deserializer = deserializerClass.newInstance() deserializer.initialize(hconf, tableDesc.getProperties) - // Deserialize each Writable to get the row value. - iter.map { - case v: Writable => deserializer.deserialize(v) - case value => - sys.error(s"Unable to deserialize non-Writable: $value of ${value.getClass.getName}") - } + HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow) } deserializedHadoopRDD } - override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_] = { + override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row] = { val partitionToDeserializer = partitions.map(part => (part, part.getDeserializer.getClass.asInstanceOf[Class[Deserializer]])).toMap makeRDDForPartitionedTable(partitionToDeserializer, filterOpt = None) @@ -133,8 +136,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon */ def makeRDDForPartitionedTable( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], - filterOpt: Option[PathFilter]): RDD[_] = { - + filterOpt: Option[PathFilter]): RDD[Row] = { val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) val partPath = partition.getPartitionPath @@ -156,33 +158,42 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon } // Create local references so that the outer object isn't serialized. - val tableDesc = _tableDesc + val tableDesc = relation.tableDesc val broadcastedHiveConf = _broadcastedHiveConf val localDeserializer = partDeserializer + val mutableRow = new GenericMutableRow(attributes.length) + + // split the attributes (output schema) into 2 categories: + // (partition keys, ordinal), (normal attributes, ordinal), the ordinal mean the + // index of the attribute in the output Row. + val (partitionKeys, attrs) = attributes.zipWithIndex.partition(attr => { + relation.partitionKeys.indexOf(attr._1) >= 0 + }) + + def fillPartitionKeys(parts: Array[String], row: GenericMutableRow) = { + partitionKeys.foreach { case (attr, ordinal) => + // get partition key ordinal for a given attribute + val partOridinal = relation.partitionKeys.indexOf(attr) + row(ordinal) = Cast(Literal(parts(partOridinal)), attr.dataType).eval(null) + } + } + // fill the partition key for the given MutableRow Object + fillPartitionKeys(partValues, mutableRow) val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) hivePartitionRDD.mapPartitions { iter => val hconf = broadcastedHiveConf.value.value - val rowWithPartArr = new Array[Object](2) - - // The update and deserializer initialization are intentionally - // kept out of the below iter.map loop to save performance. - rowWithPartArr.update(1, partValues) val deserializer = localDeserializer.newInstance() deserializer.initialize(hconf, partProps) - // Map each tuple to a row object - iter.map { value => - val deserializedRow = deserializer.deserialize(value) - rowWithPartArr.update(0, deserializedRow) - rowWithPartArr.asInstanceOf[Object] - } + // fill the non partition key attributes + HadoopTableReader.fillObject(iter, deserializer, attrs, mutableRow) } }.toSeq // Even if we don't use any partitions, we still need an empty RDD if (hivePartitionRDDs.size == 0) { - new EmptyRDD[Object](sc.sparkContext) + new EmptyRDD[Row](sc.sparkContext) } else { new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs) } @@ -225,10 +236,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } - } -private[hive] object HadoopTableReader { +private[hive] object HadoopTableReader extends HiveInspectors { /** * Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to * instantiate a HadoopRDD. @@ -241,4 +251,37 @@ private[hive] object HadoopTableReader { val bufferSize = System.getProperty("spark.buffer.size", "65536") jobConf.set("io.file.buffer.size", bufferSize) } + + /** + * Transform the raw data(Writable object) into the Row object for an iterable input + * @param iter Iterable input which represented as Writable object + * @param deserializer Deserializer associated with the input writable object + * @param attrs Represents the row attribute names and its zero-based position in the MutableRow + * @param row reusable MutableRow object + * + * @return Iterable Row object that transformed from the given iterable input. + */ + def fillObject(iter: Iterator[Writable], deserializer: Deserializer, + attrs: Seq[(Attribute, Int)], row: GenericMutableRow): Iterator[Row] = { + val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector] + // get the field references according to the attributes(output of the reader) required + val fieldRefs = attrs.map { case (attr, idx) => (soi.getStructFieldRef(attr.name), idx) } + + // Map each tuple to a row object + iter.map { value => + val raw = deserializer.deserialize(value) + var idx = 0; + while (idx < fieldRefs.length) { + val fieldRef = fieldRefs(idx)._1 + val fieldIdx = fieldRefs(idx)._2 + val fieldValue = soi.getStructFieldData(raw, fieldRef) + + row(fieldIdx) = unwrapData(fieldValue, fieldRef.getFieldObjectInspector()) + + idx += 1 + } + + row: Row + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index e7016fa16eea9..e8a6f7b070257 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive._ -import org.apache.spark.util.MutablePair /** * :: DeveloperApi :: @@ -50,8 +49,7 @@ case class HiveTableScan( relation: MetastoreRelation, partitionPruningPred: Option[Expression])( @transient val context: HiveContext) - extends LeafNode - with HiveInspectors { + extends LeafNode { require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") @@ -67,42 +65,7 @@ case class HiveTableScan( } @transient - private[this] val hadoopReader = new HadoopTableReader(relation.tableDesc, context) - - /** - * The hive object inspector for this table, which can be used to extract values from the - * serialized row representation. - */ - @transient - private[this] lazy val objectInspector = - relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector] - - /** - * Functions that extract the requested attributes from the hive output. Partitioned values are - * casted from string to its declared data type. - */ - @transient - protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = { - attributes.map { a => - val ordinal = relation.partitionKeys.indexOf(a) - if (ordinal >= 0) { - val dataType = relation.partitionKeys(ordinal).dataType - (_: Any, partitionKeys: Array[String]) => { - castFromString(partitionKeys(ordinal), dataType) - } - } else { - val ref = objectInspector.getAllStructFieldRefs - .find(_.getFieldName == a.name) - .getOrElse(sys.error(s"Can't find attribute $a")) - val fieldObjectInspector = ref.getFieldObjectInspector - - (row: Any, _: Array[String]) => { - val data = objectInspector.getStructFieldData(row, ref) - unwrapData(data, fieldObjectInspector) - } - } - } - } + private[this] val hadoopReader = new HadoopTableReader(attributes, relation, context) private[this] def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) @@ -114,6 +77,7 @@ case class HiveTableScan( val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",") if (attributes.size == relation.output.size) { + // TODO what if duplicated attributes queried? ColumnProjectionUtils.setFullyReadColumns(hiveConf) } else { ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) @@ -140,12 +104,6 @@ case class HiveTableScan( addColumnMetadataToConf(context.hiveconf) - private def inputRdd = if (!relation.hiveQlTable.isPartitioned) { - hadoopReader.makeRDDForTable(relation.hiveQlTable) - } else { - hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions)) - } - /** * Prunes partitions not involve the query plan. * @@ -169,44 +127,10 @@ case class HiveTableScan( } } - override def execute() = { - inputRdd.mapPartitions { iterator => - if (iterator.isEmpty) { - Iterator.empty - } else { - val mutableRow = new GenericMutableRow(attributes.length) - val mutablePair = new MutablePair[Any, Array[String]]() - val buffered = iterator.buffered - - // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern - // matching are avoided intentionally. - val rowsAndPartitionKeys = buffered.head match { - // With partition keys - case _: Array[Any] => - buffered.map { case array: Array[Any] => - val deserializedRow = array(0) - val partitionKeys = array(1).asInstanceOf[Array[String]] - mutablePair.update(deserializedRow, partitionKeys) - } - - // Without partition keys - case _ => - val emptyPartitionKeys = Array.empty[String] - buffered.map { deserializedRow => - mutablePair.update(deserializedRow, emptyPartitionKeys) - } - } - - rowsAndPartitionKeys.map { pair => - var i = 0 - while (i < attributes.length) { - mutableRow(i) = attributeFunctions(i)(pair._1, pair._2) - i += 1 - } - mutableRow: Row - } - } - } + override def execute() = if (!relation.hiveQlTable.isPartitioned) { + hadoopReader.makeRDDForTable(relation.hiveQlTable) + } else { + hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions)) } override def output = attributes