From 3c9038e49a5370e861f63cbc6a40370a3d21163a Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Thu, 14 May 2015 13:19:30 -0700 Subject: [PATCH] resolve review comments --- .../sql/hive/orc/HadoopTypeConverter.scala | 39 +---- .../spark/sql/hive/orc/OrcFileOperator.scala | 22 +-- .../spark/sql/hive/orc/OrcFilters.scala | 133 ++++++------------ .../spark/sql/hive/orc/OrcRelation.scala | 94 +++++-------- .../sql/hive/orc/OrcTableOperations.scala | 51 +++---- .../apache/spark/sql/hive/orc/package.scala | 19 +-- .../hive/orc/OrcPartitionDiscoverySuite.scala | 7 +- .../spark/sql/hive/orc/OrcQuerySuite.scala | 29 +++- 8 files changed, 157 insertions(+), 237 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala index aabc5477b05a6..713c076aee457 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.hive.orc -import org.apache.hadoop.hive.common.`type`.HiveVarchar -import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} + import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.primitive._ -import org.apache.spark.sql.catalyst.expressions.{Row, MutableRow} -import scala.collection.JavaConversions._ +import org.apache.spark.sql.catalyst.expressions.MutableRow +import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} /** * We can consolidate TableReader.unwrappers and HiveInspectors.wrapperFor to use @@ -59,35 +58,5 @@ private[hive] object HadoopTypeConverter extends HiveInspectors { /** * Wraps with Hive types based on object inspector. */ - def wrappers(oi: ObjectInspector): Any => Any = oi match { - case _: JavaHiveVarcharObjectInspector => - (o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size) - - case _: JavaHiveDecimalObjectInspector => - (o: Any) => HiveShim.createDecimal(o.asInstanceOf[BigDecimal].underlying()) - - case soi: StandardStructObjectInspector => - val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector)) - (o: Any) => { - val struct = soi.create() - (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row].toSeq).zipped.foreach { - (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data)) - } - struct - } - - case loi: ListObjectInspector => - val wrapper = wrapperFor(loi.getListElementObjectInspector) - (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper)) - - case moi: MapObjectInspector => - val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector) - val valueWrapper = wrapperFor(moi.getMapValueObjectInspector) - (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) => - keyWrapper(key) -> valueWrapper(value) - }) - - case _ => - identity[Any] - } + def wrappers(oi: ObjectInspector): Any => Any = wrapperFor(oi) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 6805677e84ea9..4dd2d8951b728 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.hive.orc -import java.io.IOException - import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader} import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector + import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.hive.HiveMetastoreTypes @@ -31,7 +30,7 @@ import org.apache.spark.sql.types.StructType private[orc] object OrcFileOperator extends Logging{ def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = { - var conf = config.getOrElse(new Configuration) + val conf = config.getOrElse(new Configuration) val fspath = new Path(pathStr) val fs = fspath.getFileSystem(conf) val orcFiles = listOrcFiles(pathStr, conf) @@ -53,19 +52,6 @@ private[orc] object OrcFileOperator extends Logging{ readerInspector } - def deletePath(pathStr: String, conf: Configuration): Unit = { - val fspath = new Path(pathStr) - val fs = fspath.getFileSystem(conf) - try { - fs.delete(fspath, true) - } catch { - case e: IOException => - throw new IOException( - s"Unable to clear output directory ${fspath.toString} prior" - + s" to InsertIntoOrcTable:\n${e.toString}") - } - } - def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) @@ -80,8 +66,6 @@ private[orc] object OrcFileOperator extends Logging{ throw new IllegalArgumentException( s"orcFileOperator: path $path does not have valid orc files matching the pattern") } - logInfo("Qualified file list: ") - paths.foreach{x=>logInfo(x.toString)} paths } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 0a9924b139a48..eda1cffe49810 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -22,100 +22,55 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder import org.apache.spark.Logging import org.apache.spark.sql.sources._ -private[sql] object OrcFilters extends Logging { - +/** + * It may be optimized by push down partial filters. But we are conservative here. + * Because if some filters fail to be parsed, the tree may be corrupted, + * and cannot be used anymore. + */ +private[orc] object OrcFilters extends Logging { def createFilter(expr: Array[Filter]): Option[SearchArgument] = { - if (expr == null || expr.size == 0) return None - var sarg: Option[Builder] = Some(SearchArgument.FACTORY.newBuilder()) - sarg.get.startAnd() - expr.foreach { - x => { - sarg match { - case Some(s1) => sarg = createFilter(x, s1) - case _ => None - } - } - } - sarg match { - case Some(b) => Some(b.end.build) - case _ => None + if (expr.nonEmpty) { + expr.foldLeft(Some(SearchArgument.FACTORY.newBuilder().startAnd()): Option[Builder]) { + (maybeBuilder, e) => createFilter(e, maybeBuilder) + }.map(_.end().build()) + } else { + None } } - def createFilter(expression: Filter, builder: Builder): Option[Builder] = { - expression match { - case p@And(left: Filter, right: Filter) => { - val b1 = builder.startAnd() - val b2 = createFilter(left, b1) - b2 match { - case Some(b) => val b3 = createFilter(right, b) - if (b3.isDefined) { - Some(b3.get.end) - } else { - None - } - case _ => None - } - } - case p@Or(left: Filter, right: Filter) => { - val b1 = builder.startOr() - val b2 = createFilter(left, b1) - b2 match { - case Some(b) => val b3 = createFilter(right, b) - if (b3.isDefined) { - Some(b3.get.end) - } else { - None - } - case _ => None - } - } - case p@Not(child: Filter) => { - val b1 = builder.startNot() - val b2 = createFilter(child, b1) - b2 match { - case Some(b) => Some(b.end) - case _ => None - } - } - case p@EqualTo(attribute: String, value: Any) => { - val b1 = builder.equals(attribute, value) - Some(b1) - } - case p@LessThan(attribute: String, value: Any) => { - val b1 = builder.lessThan(attribute ,value) - Some(b1) - } - case p@LessThanOrEqual(attribute: String, value: Any) => { - val b1 = builder.lessThanEquals(attribute, value) - Some(b1) - } - case p@GreaterThan(attribute: String, value: Any) => { - val b1 = builder.startNot().lessThanEquals(attribute, value).end() - Some(b1) - } - case p@GreaterThanOrEqual(attribute: String, value: Any) => { - val b1 = builder.startNot().lessThan(attribute, value).end() - Some(b1) - } - case p@IsNull(attribute: String) => { - val b1 = builder.isNull(attribute) - Some(b1) - } - case p@IsNotNull(attribute: String) => { - val b1 = builder.startNot().isNull(attribute).end() - Some(b1) - } - case p@In(attribute: String, values: Array[Any]) => { - val b1 = builder.in(attribute, values) - Some(b1) + private def createFilter(expression: Filter, maybeBuilder: Option[Builder]): Option[Builder] = { + maybeBuilder.flatMap { builder => + expression match { + case p@And(left, right) => + for { + lhs <- createFilter(left, Some(builder.startAnd())) + rhs <- createFilter(right, Some(lhs)) + } yield rhs.end() + case p@Or(left, right) => + for { + lhs <- createFilter(left, Some(builder.startOr())) + rhs <- createFilter(right, Some(lhs)) + } yield rhs.end() + case p@Not(child) => + createFilter(child, Some(builder.startNot())).map(_.end()) + case p@EqualTo(attribute, value) => + Some(builder.equals(attribute, value)) + case p@LessThan(attribute, value) => + Some(builder.lessThan(attribute, value)) + case p@LessThanOrEqual(attribute, value) => + Some(builder.lessThanEquals(attribute, value)) + case p@GreaterThan(attribute, value) => + Some(builder.startNot().lessThanEquals(attribute, value).end()) + case p@GreaterThanOrEqual(attribute, value) => + Some(builder.startNot().lessThan(attribute, value).end()) + case p@IsNull(attribute) => + Some(builder.isNull(attribute)) + case p@IsNotNull(attribute) => + Some(builder.startNot().isNull(attribute).end()) + case p@In(attribute, values) => + Some(builder.in(attribute, values)) + case _ => None } - // not supported in filter - // case p@EqualNullSafe(left: String, right: String) => { - // val b1 = builder.nullSafeEquals(left, right) - // Some(b1) - // } - case _ => None } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 816f3794a6a02..c68a58647cad7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -26,11 +26,11 @@ import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, TypeInfo} import org.apache.hadoop.io.{Writable, NullWritable} import org.apache.hadoop.mapred.{RecordWriter, Reporter, JobConf} import org.apache.hadoop.mapreduce.{TaskID, TaskAttemptContext} + import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} @@ -39,7 +39,6 @@ import scala.collection.JavaConversions._ private[sql] class DefaultSource extends FSBasedRelationProvider { - def createRelation( sqlContext: SQLContext, paths: Array[String], @@ -54,7 +53,7 @@ private[sql] class DefaultSource extends FSBasedRelationProvider { private[sql] class OrcOutputWriter extends OutputWriter with SparkHadoopMapRedUtil { - var recordWriter: RecordWriter[NullWritable, Writable] = _ + var taskAttemptContext: TaskAttemptContext = _ var serializer: OrcSerde = _ var wrappers: Array[Any => Any] = _ @@ -62,90 +61,75 @@ private[sql] class OrcOutputWriter extends OutputWriter with SparkHadoopMapRedUt var path: String = _ var dataSchema: StructType = _ var fieldOIs: Array[ObjectInspector] = _ - var standardOI: StructObjectInspector = _ - + var structOI: StructObjectInspector = _ + var outputData: Array[Any] = _ + lazy val recordWriter: RecordWriter[NullWritable, Writable] = { + created = true + val conf = taskAttemptContext.getConfiguration + val taskId: TaskID = taskAttemptContext.getTaskAttemptID.getTaskID + val partition: Int = taskId.getId + val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc" + val file = new Path(path, filename) + val fs = file.getFileSystem(conf) + val outputFormat = new OrcOutputFormat() + outputFormat.getRecordWriter(fs, + conf.asInstanceOf[JobConf], + file.toUri.getPath, Reporter.NULL) + .asInstanceOf[org.apache.hadoop.mapred.RecordWriter[NullWritable, Writable]] + } override def init(path: String, dataSchema: StructType, context: TaskAttemptContext): Unit = { this.path = path - this.dataSchema = dataSchema taskAttemptContext = context + val orcSchema = HiveMetastoreTypes.toMetastoreType(dataSchema) + serializer = new OrcSerde + val typeInfo: TypeInfo = + TypeInfoUtils.getTypeInfoFromTypeString(orcSchema) + structOI = TypeInfoUtils + .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) + .asInstanceOf[StructObjectInspector] + fieldOIs = structOI + .getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + outputData = new Array[Any](fieldOIs.length) + wrappers = fieldOIs.map(HadoopTypeConverter.wrappers) } - // Avoid create empty file without schema attached - private def initWriter() = { - if (!created) { - created = true - val conf = taskAttemptContext.getConfiguration - val outputFormat = new OrcOutputFormat() - val taskId: TaskID = taskAttemptContext.getTaskAttemptID.getTaskID - val partition: Int = taskId.getId - val filename = s"part-r-${partition}-${System.currentTimeMillis}.orc" - val file = new Path(path, filename) - val fs = file.getFileSystem(conf) - val orcSchema = HiveMetastoreTypes.toMetastoreType(dataSchema) - - serializer = new OrcSerde - val typeInfo: TypeInfo = - TypeInfoUtils.getTypeInfoFromTypeString(orcSchema) - standardOI = TypeInfoUtils - .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) - .asInstanceOf[StructObjectInspector] - fieldOIs = standardOI - .getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - wrappers = fieldOIs.map(HadoopTypeConverter.wrappers) - recordWriter = { - outputFormat.getRecordWriter(fs, - conf.asInstanceOf[JobConf], - file.toUri.getPath, Reporter.NULL) - .asInstanceOf[org.apache.hadoop.mapred.RecordWriter[NullWritable, Writable]] - } - } - } override def write(row: Row): Unit = { - initWriter() var i = 0 - val outputData = new Array[Any](fieldOIs.length) while (i < row.length) { outputData(i) = wrappers(i)(row(i)) i += 1 } - val writable = serializer.serialize(outputData, standardOI) + val writable = serializer.serialize(outputData, structOI) recordWriter.write(NullWritable.get(), writable) } override def close(): Unit = { - if (recordWriter != null) { + if (created) { recordWriter.close(Reporter.NULL) } } } - @DeveloperApi -private[sql] case class OrcRelation(override val paths: Array[String], +private[sql] case class OrcRelation( + override val paths: Array[String], parameters: Map[String, String], maybeSchema: Option[StructType] = None, maybePartitionSpec: Option[PartitionSpec] = None)( @transient val sqlContext: SQLContext) extends FSBasedRelation(paths, maybePartitionSpec) with Logging { - self: Product => - @transient val conf = sqlContext.sparkContext.hadoopConfiguration - - - override def dataSchema: StructType = - maybeSchema.getOrElse(OrcFileOperator.readSchema(paths(0), Some(conf))) + override val dataSchema: StructType = + maybeSchema.getOrElse(OrcFileOperator.readSchema(paths(0), + Some(sqlContext.sparkContext.hadoopConfiguration))) override def outputWriterClass: Class[_ <: OutputWriter] = classOf[OrcOutputWriter] - /** Attributes */ - var output: Seq[Attribute] = schema.toAttributes override def needConversion: Boolean = false - // Equals must also take into account the output attributes so that we can distinguish between - // different instances of the same relation, override def equals(other: Any): Boolean = other match { case that: OrcRelation => paths.toSet == that.paths.toSet && @@ -162,6 +146,7 @@ private[sql] case class OrcRelation(override val paths: Array[String], schema, maybePartitionSpec) } + override def buildScan(requiredColumns: Array[String], filters: Array[Filter], inputPaths: Array[String]): RDD[Row] = { @@ -169,8 +154,3 @@ private[sql] case class OrcRelation(override val paths: Array[String], OrcTableScan(output, this, filters, inputPaths).execute() } } - -private[sql] object OrcRelation extends Logging { - // Default partition name to use when the partition column value is null or empty string. - val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala index 94c78a14524b5..2163b0ce70e99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala @@ -20,50 +20,52 @@ package org.apache.spark.sql.hive.orc import java.util._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc._ import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark.rdd.RDD + +import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.hive.HiveShim import org.apache.spark.sql.sources.Filter import org.apache.spark.{Logging, SerializableWritable} + +/* Implicit conversions */ import scala.collection.JavaConversions._ -case class OrcTableScan(attributes: Seq[Attribute], +private[orc] case class OrcTableScan(attributes: Seq[Attribute], @transient relation: OrcRelation, filters: Array[Filter], inputPaths: Array[String]) extends Logging { - @transient val sqlContext = relation.sqlContext - val path = relation.paths(0) + @transient private val sqlContext = relation.sqlContext - def addColumnIds(output: Seq[Attribute], - relation: OrcRelation, conf: Configuration) { - val ids = - output.map(a => - relation.dataSchema.toAttributes.indexWhere(_.name == a.name): Integer) - .filter(_ >= 0) - val names = attributes.map(_.name) - val sorted = ids.zip(names).sorted - HiveShim.appendReadColumns(conf, sorted.map(_._1), sorted.map(_._2)) + private def addColumnIds( + output: Seq[Attribute], + relation: OrcRelation, + conf: Configuration): Unit = { + val ids = output.map(a => relation.dataSchema.fieldIndex(a.name): Integer) + val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIds, sortedNames) } - def buildFilter(job: Job, filters: Array[Filter]): Unit = { + private def buildFilter(job: Job, filters: Array[Filter]): Unit = { if (ORC_FILTER_PUSHDOWN_ENABLED) { val conf: Configuration = job.getConfiguration - val recordFilter = OrcFilters.createFilter(filters) - if (recordFilter.isDefined) { - conf.set(SARG_PUSHDOWN, toKryo(recordFilter.get)) - conf.setBoolean(INDEX_FILTER, true) + OrcFilters.createFilter(filters).foreach { f => + conf.set(SARG_PUSHDOWN, toKryo(f)) + conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) } } } // Transform all given raw `Writable`s into `Row`s. - def fillObject(conf: Configuration, + private def fillObject( + path: String, + conf: Configuration, iterator: Iterator[org.apache.hadoop.io.Writable], nonPartitionKeyAttrs: Seq[(Attribute, Int)], mutableRow: MutableRow): Iterator[Row] = { @@ -77,7 +79,6 @@ case class OrcTableScan(attributes: Seq[Attribute], // Map each tuple to a row object iterator.map { value => val raw = deserializer.deserialize(value) - logDebug("Raw data: " + raw) var i = 0 while (i < fieldRefs.length) { val fieldValue = soi.getStructFieldData(raw, fieldRefs(i)) @@ -105,11 +106,13 @@ case class OrcTableScan(attributes: Seq[Attribute], Class[_ <: org.apache.hadoop.mapred.InputFormat[NullWritable, Writable]]] val rdd = sc.hadoopRDD(conf.asInstanceOf[JobConf], - inputClass, classOf[NullWritable], classOf[Writable]).map(_._2) - val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + inputClass, classOf[NullWritable], classOf[Writable]) + .asInstanceOf[HadoopRDD[NullWritable, Writable]] val wrappedConf = new SerializableWritable(conf) - val rowRdd: RDD[Row] = rdd.mapPartitions { iter => - fillObject(wrappedConf.value, iter, attributes.zipWithIndex, mutableRow) + val rowRdd: RDD[Row] = rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iter) => + val pathStr = split.getPath.toString + val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + fillObject(pathStr, wrappedConf.value, iter.map(_._2), attributes.zipWithIndex, mutableRow) } rowRdd } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala index 93af5af196dab..b219fbb44ca0d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala @@ -21,19 +21,21 @@ import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.Kryo import org.apache.commons.codec.binary.Base64 import org.apache.spark.sql.{SaveMode, DataFrame} -import scala.reflect.runtime.universe.{TypeTag, typeTag} package object orc { implicit class OrcContext(sqlContext: HiveContext) { import sqlContext._ @scala.annotation.varargs - def orcFile(paths: String*): DataFrame = { - if (paths.isEmpty) { - emptyDataFrame - } else { - val orcRelation = OrcRelation(paths.toArray, Map.empty)(sqlContext) - sqlContext.baseRelationToDataFrame(orcRelation) + def orcFile(path: String, paths: String*): DataFrame = { + val pathArray: Array[String] = { + if (paths.isEmpty) { + Array(path) + } else { + paths.toArray ++ Array(path) + } } + val orcRelation = OrcRelation(pathArray, Map.empty)(sqlContext) + sqlContext.baseRelationToDataFrame(orcRelation) } } @@ -49,8 +51,7 @@ package object orc { // Flags for orc copression, predicates pushdown, etc. val orcDefaultCompressVar = "hive.exec.orc.default.compress" var ORC_FILTER_PUSHDOWN_ENABLED = true - val SARG_PUSHDOWN = "sarg.pushdown"; - val INDEX_FILTER = "hive.optimize.index.filter" + val SARG_PUSHDOWN = "sarg.pushdown" def toKryo(input: Any): String = { val out = new Output(4 * 1024, 10 * 1024 * 1024); diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala index 7ebf3c6eced26..31a829a81124d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.orc import java.io.File +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.hive.test.TestHive @@ -37,7 +38,7 @@ case class OrcParData(intField: Int, stringField: String) case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: String) class OrcPartitionDiscoverySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { - val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__" + val defaultPartitionName = ConfVars.DEFAULTPARTITIONNAME.defaultVal def withTempDir(f: File => Unit): Unit = { val dir = Utils.createTempDir().getCanonicalFile @@ -187,7 +188,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with FunSuiteLike with Before "org.apache.spark.sql.hive.orc.DefaultSource", Map( "path" -> base.getCanonicalPath, - OrcRelation.DEFAULT_PARTITION_NAME -> defaultPartitionName)) + ConfVars.DEFAULTPARTITIONNAME.varname -> defaultPartitionName)) orcRelation.registerTempTable("t") @@ -232,7 +233,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with FunSuiteLike with Before "org.apache.spark.sql.hive.orc.DefaultSource", Map( "path" -> base.getCanonicalPath, - OrcRelation.DEFAULT_PARTITION_NAME -> defaultPartitionName)) + ConfVars.DEFAULTPARTITIONNAME.varname -> defaultPartitionName)) orcRelation.registerTempTable("t") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 3c596d0654324..475af3d4c94e4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -153,13 +153,40 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { data.toDF().saveAsOrcFile(tempDir) val f = TestHive.orcFile(tempDir) f.registerTempTable("tmp") + + // ppd: + // leaf-0 = (LESS_THAN_EQUALS age 5) + // expr = leaf-0 var rdd = sql("SELECT name FROM tmp where age <= 5") assert(rdd.count() == 5) + // ppd: + // leaf-0 = (LESS_THAN_EQUALS age 5) + // expr = (not leaf-0) rdd = sql("SELECT name, contacts FROM tmp where age > 5") assert(rdd.count() == 5) - val contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) + var contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) assert(contacts.count() == 10) + + // ppd: + // leaf-0 = (LESS_THAN_EQUALS age 5) + // leaf-1 = (LESS_THAN age 8) + // expr = (and (not leaf-0) leaf-1) + rdd = sql("SELECT name, contacts FROM tmp where age > 5 and age < 8") + assert(rdd.count() == 2) + contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) + assert(contacts.count() == 4) + + // ppd: + // leaf-0 = (LESS_THAN age 2) + // leaf-1 = (LESS_THAN_EQUALS age 8) + // expr = (or leaf-0 (not leaf-1)) + rdd = sql("SELECT name, contacts FROM tmp where age < 2 or age > 8") + assert(rdd.count() == 3) + contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) + assert(contacts.count() == 6) + + Utils.deleteRecursively(new File(tempDir)) }