Skip to content

Commit

Permalink
Using ColumnProjectionUtils to optimise RCFile and ORC column pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 28, 2014
1 parent eb62fd3 commit 6d1c642
Showing 1 changed file with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.sql.hive.execution

import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive}
import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._

Expand Down Expand Up @@ -119,6 +122,38 @@ case class HiveTableScan(
Cast(Literal(value), dataType).eval(null)
}

private def addColumnMetadataToConf(hiveConf: HiveConf) {
// Specifies IDs and internal names of columns to be scanned.
val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer)
val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")

if (attributes.size == relation.output.size) {
ColumnProjectionUtils.setFullyReadColumns(hiveConf)
} else {
ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
}

ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name))

// Specifies types and object inspectors of columns to be scanned.
val structOI = ObjectInspectorUtils
.getStandardObjectInspector(
relation.tableDesc.getDeserializer.getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]

val columnTypeNames = structOI
.getAllStructFieldRefs
.map(_.getFieldObjectInspector)
.map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
.mkString(",")

hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames)
}

addColumnMetadataToConf(sc.hiveconf)

@transient
def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
hadoopReader.makeRDDForTable(relation.hiveQlTable)
Expand Down Expand Up @@ -156,18 +191,19 @@ case class HiveTableScan(
} else {
val mutableRow = new GenericMutableRow(attributes.length)
val buffered = iterator.buffered

(buffered.head match {
val rowsAndPartitionKeys = buffered.head match {
case Array(_, _) =>
buffered.map { case Array(deserializedRow, partitionKeys: Array[String]) =>
(deserializedRow, partitionKeys)
}

case _ =>
buffered.map { deserializedRow =>
(deserializedRow, Array.empty[String])
buffered.map {
(_, Array.empty[String])
}
}).map { case (deserializedRow, partitionKeys: Array[String]) =>
}

rowsAndPartitionKeys.map { case (deserializedRow, partitionKeys) =>
var i = 0

while (i < attributes.length) {
Expand Down

0 comments on commit 6d1c642

Please sign in to comment.