From 010c460d627c1917dc47b09e59fd41172bbf90b3 Mon Sep 17 00:00:00 2001 From: Anant Date: Fri, 20 Jun 2014 18:54:00 -0700 Subject: [PATCH 1/5] [SPARK-2061] Made splits deprecated in JavaRDDLike The jira for the issue can be found at: https://issues.apache.org/jira/browse/SPARK-2061 Most of spark has used over to consistently using `partitions` instead of `splits`. We should do likewise and add a `partitions` method to JavaRDDLike and have `splits` just call that. We should also go through all cases where other API's (e.g. Python) call `splits` and we should change those to use the newer API. Author: Anant Closes #1062 from anantasty/SPARK-2061 and squashes the following commits: b83ce6b [Anant] Fixed syntax issue 21f9210 [Anant] Fixed version number in deprecation string 9315b76 [Anant] made related changes to use partitions in python api 8c62dd1 [Anant] Made splits deprecated in JavaRDDLike --- .../main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 5 ++++- core/src/test/java/org/apache/spark/JavaAPISuite.java | 2 +- python/pyspark/context.py | 2 +- python/pyspark/rdd.py | 4 ++-- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 330569a8d8837..f917cfd1419ec 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -43,8 +43,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def rdd: RDD[T] - /** Set of partitions in this RDD. */ + @deprecated("Use partitions() instead.", "1.1.0") def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq) + + /** Set of partitions in this RDD. */ + def partitions: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq) /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */ def context: SparkContext = rdd.context diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 761f2d6a77d33..1d7a7be6cfeb3 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -741,7 +741,7 @@ public void persist() { public void iterator() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); TaskContext context = new TaskContext(0, 0, 0, false, new TaskMetrics()); - Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue()); + Assert.assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue()); } @Test diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 062bec2381a8f..95c54e7a5ad63 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -704,7 +704,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False): [0, 1, 16, 25] """ if partitions == None: - partitions = range(rdd._jrdd.splits().size()) + partitions = range(rdd._jrdd.partitions().size()) javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client) # Implementation note: This is implemented as a mapPartitions followed diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1d55c35a8bf48..f64f48e3a4c9c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -321,7 +321,7 @@ def getNumPartitions(self): >>> rdd.getNumPartitions() 2 """ - return self._jrdd.splits().size() + return self._jrdd.partitions().size() def filter(self, f): """ @@ -922,7 +922,7 @@ def take(self, num): [91, 92, 93] """ items = [] - totalParts = self._jrdd.splits().size() + totalParts = self._jrdd.partitions().size() partsScanned = 0 while len(items) < num and partsScanned < totalParts: From 648553d48ee1f830406750b50ec4cc322bcf47fe Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 20 Jun 2014 20:05:12 -0700 Subject: [PATCH 2/5] Fix some tests. - JavaAPISuite was trying to compare a bare path with a URI. Fix by extracting the path from the URI, since we know it should be a local path anyway/ - b9be1609 excluded the ASM dependency everywhere, but easymock needs it (because cglib needs it). So re-add the dependency, with test scope this time. The second one above actually uncovered a weird situation: the maven test target works, even though I can't find the class sbt complains about in its classpath. sbt complains with: [error] Uncaught exception when running org.apache.spark.util .random.RandomSamplerSuite: java.lang.NoClassDefFoundError: org/objectweb/asm/Type To avoid more weirdness caused by that, I explicitly added the asm dependency to both maven and sbt (for tests only), and verified the classes don't end up in the final assembly. Author: Marcelo Vanzin Closes #917 from vanzin/flaky-tests and squashes the following commits: d022320 [Marcelo Vanzin] Fix some tests. --- core/pom.xml | 5 +++++ core/src/test/java/org/apache/spark/JavaAPISuite.java | 5 +++-- pom.xml | 7 +++++++ project/SparkBuild.scala | 10 ++++++---- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index bd6767e03bb9d..8c23842730e37 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -244,6 +244,11 @@ easymockclassextension test + + asm + asm + test + com.novocode junit-interface diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 1d7a7be6cfeb3..b2868b59ce6c6 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -18,6 +18,7 @@ package org.apache.spark; import java.io.*; +import java.net.URI; import java.util.*; import scala.Tuple2; @@ -768,7 +769,7 @@ public void textFiles() throws IOException { } @Test - public void wholeTextFiles() throws IOException { + public void wholeTextFiles() throws Exception { byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8"); @@ -784,7 +785,7 @@ public void wholeTextFiles() throws IOException { List> result = readRDD.collect(); for (Tuple2 res : result) { - Assert.assertEquals(res._2(), container.get(res._1())); + Assert.assertEquals(res._2(), container.get(new URI(res._1()).getPath())); } } diff --git a/pom.xml b/pom.xml index 0d46bb4114f73..05f76d566e9d1 100644 --- a/pom.xml +++ b/pom.xml @@ -468,6 +468,13 @@ 3.1 test + + + asm + asm + 3.3.1 + test + org.mockito mockito-all diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 7bb39dc77120b..55a2aa0fc7141 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -293,7 +293,9 @@ object SparkBuild extends Build { "com.novocode" % "junit-interface" % "0.10" % "test", "org.easymock" % "easymockclassextension" % "3.1" % "test", "org.mockito" % "mockito-all" % "1.9.0" % "test", - "junit" % "junit" % "4.10" % "test" + "junit" % "junit" % "4.10" % "test", + // Needed by cglib which is needed by easymock. + "asm" % "asm" % "3.3.1" % "test" ), testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), @@ -461,7 +463,7 @@ object SparkBuild extends Build { def toolsSettings = sharedSettings ++ Seq( name := "spark-tools", - libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-compiler" % v ), + libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-compiler" % v), libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v ) ) ++ assemblySettings ++ extraAssemblySettings @@ -630,9 +632,9 @@ object SparkBuild extends Build { scalaVersion := "2.10.4", retrieveManaged := true, retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", - libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq", + libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq", "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter", - "spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx", + "spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx", "spark-core").map(sparkPreviousArtifact(_).get intransitive()) ) From ca5d8b5904dc6dd5b691af506d3a842e508b3673 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 20 Jun 2014 22:49:48 -0700 Subject: [PATCH 3/5] [SQL] Pass SQLContext instead of SparkContext into physical operators. This makes it easier to use config options in operators. Author: Reynold Xin Closes #1164 from rxin/sqlcontext and squashes the following commits: 797b2fd [Reynold Xin] Pass SQLContext instead of SparkContext into physical operators. --- .../org/apache/spark/sql/SQLContext.scala | 4 +++- .../spark/sql/execution/Aggregate.scala | 5 +++-- .../spark/sql/execution/SparkStrategies.scala | 22 +++++++++---------- .../spark/sql/execution/basicOperators.scala | 20 +++++++++-------- .../apache/spark/sql/execution/joins.scala | 21 +++++++++--------- .../sql/parquet/ParquetTableOperations.scala | 21 +++++++++--------- .../spark/sql/parquet/ParquetQuerySuite.scala | 2 +- 7 files changed, 51 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ab376e5504d35..c60af28b2a1f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -221,7 +221,9 @@ class SQLContext(@transient val sparkContext: SparkContext) } protected[sql] class SparkPlanner extends SparkStrategies { - val sparkContext = self.sparkContext + val sparkContext: SparkContext = self.sparkContext + + val sqlContext: SQLContext = self def numPartitions = self.numShufflePartitions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index 34d88fe4bd7de..d85d2d7844e0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.SQLContext /** * :: DeveloperApi :: @@ -41,7 +42,7 @@ case class Aggregate( partial: Boolean, groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], - child: SparkPlan)(@transient sc: SparkContext) + child: SparkPlan)(@transient sqlContext: SQLContext) extends UnaryNode with NoBind { override def requiredChildDistribution = @@ -55,7 +56,7 @@ case class Aggregate( } } - override def otherCopyArgs = sc :: Nil + override def otherCopyArgs = sqlContext :: Nil // HACK: Generators don't correctly preserve their output through serializations so we grab // out child's output attributes statically here. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4694f25d6d630..bd8ae4cddef89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -40,7 +40,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // no predicate can be evaluated by matching hash keys case logical.Join(left, right, LeftSemi, condition) => execution.LeftSemiJoinBNL( - planLater(left), planLater(right), condition)(sparkContext) :: Nil + planLater(left), planLater(right), condition)(sqlContext) :: Nil case _ => Nil } } @@ -103,7 +103,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { partial = true, groupingExpressions, partialComputation, - planLater(child))(sparkContext))(sparkContext) :: Nil + planLater(child))(sqlContext))(sqlContext) :: Nil } else { Nil } @@ -115,7 +115,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Join(left, right, joinType, condition) => execution.BroadcastNestedLoopJoin( - planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil + planLater(left), planLater(right), joinType, condition)(sqlContext) :: Nil case _ => Nil } } @@ -143,7 +143,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object TakeOrdered extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) => - execution.TakeOrdered(limit, order, planLater(child))(sparkContext) :: Nil + execution.TakeOrdered(limit, order, planLater(child))(sqlContext) :: Nil case _ => Nil } } @@ -155,9 +155,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val relation = ParquetRelation.create(path, child, sparkContext.hadoopConfiguration) // Note: overwrite=false because otherwise the metadata we just created will be deleted - InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sparkContext) :: Nil + InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sqlContext) :: Nil case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => - InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil + InsertIntoParquetTable(table, planLater(child), overwrite)(sqlContext) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => val prunePushedDownFilters = if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { @@ -186,7 +186,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { projectList, filters, prunePushedDownFilters, - ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil + ParquetTableScan(_, relation, filters)(sqlContext)) :: Nil case _ => Nil } @@ -211,7 +211,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Distinct(child) => execution.Aggregate( - partial = false, child.output, child.output, planLater(child))(sparkContext) :: Nil + partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil case logical.Sort(sortExprs, child) => // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution. execution.Sort(sortExprs, global = true, planLater(child)):: Nil @@ -224,7 +224,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Filter(condition, child) => execution.Filter(condition, planLater(child)) :: Nil case logical.Aggregate(group, agg, child) => - execution.Aggregate(partial = false, group, agg, planLater(child))(sparkContext) :: Nil + execution.Aggregate(partial = false, group, agg, planLater(child))(sqlContext) :: Nil case logical.Sample(fraction, withReplacement, seed, child) => execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => @@ -233,9 +233,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row)) execution.ExistingRdd(output, dataAsRdd) :: Nil case logical.Limit(IntegerLiteral(limit), child) => - execution.Limit(limit, planLater(child))(sparkContext) :: Nil + execution.Limit(limit, planLater(child))(sqlContext) :: Nil case Unions(unionChildren) => - execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil + execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 8969794c69933..18f4a5877bb21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.execution import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} +import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.rdd.{RDD, ShuffledRDD} +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ @@ -70,12 +71,12 @@ case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: * :: DeveloperApi :: */ @DeveloperApi -case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan { +case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan { // TODO: attributes output by union should be distinct for nullability purposes override def output = children.head.output - override def execute() = sc.union(children.map(_.execute())) + override def execute() = sqlContext.sparkContext.union(children.map(_.execute())) - override def otherCopyArgs = sc :: Nil + override def otherCopyArgs = sqlContext :: Nil } /** @@ -87,11 +88,12 @@ case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends * data to a single partition to compute the global limit. */ @DeveloperApi -case class Limit(limit: Int, child: SparkPlan)(@transient sc: SparkContext) extends UnaryNode { +case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext) + extends UnaryNode { // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: // partition local limit -> exchange into one partition -> partition local limit again - override def otherCopyArgs = sc :: Nil + override def otherCopyArgs = sqlContext :: Nil override def output = child.output @@ -117,8 +119,8 @@ case class Limit(limit: Int, child: SparkPlan)(@transient sc: SparkContext) exte */ @DeveloperApi case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) - (@transient sc: SparkContext) extends UnaryNode { - override def otherCopyArgs = sc :: Nil + (@transient sqlContext: SQLContext) extends UnaryNode { + override def otherCopyArgs = sqlContext :: Nil override def output = child.output @@ -129,7 +131,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) // TODO: Terminal split should be implemented differently from non-terminal split. // TODO: Pick num splits based on |limit|. - override def execute() = sc.makeRDD(executeCollect(), 1) + override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 8d7a5ba59f96a..84bdde38b7e9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql.execution import scala.collection.mutable.{ArrayBuffer, BitSet} -import org.apache.spark.SparkContext - import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning} @@ -200,13 +199,13 @@ case class LeftSemiJoinHash( @DeveloperApi case class LeftSemiJoinBNL( streamed: SparkPlan, broadcast: SparkPlan, condition: Option[Expression]) - (@transient sc: SparkContext) + (@transient sqlContext: SQLContext) extends BinaryNode { // TODO: Override requiredChildDistribution. override def outputPartitioning: Partitioning = streamed.outputPartitioning - override def otherCopyArgs = sc :: Nil + override def otherCopyArgs = sqlContext :: Nil def output = left.output @@ -223,7 +222,8 @@ case class LeftSemiJoinBNL( def execute() = { - val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) + val broadcastedRelation = + sqlContext.sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) streamed.execute().mapPartitions { streamedIter => val joinedRow = new JoinedRow @@ -263,13 +263,13 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod @DeveloperApi case class BroadcastNestedLoopJoin( streamed: SparkPlan, broadcast: SparkPlan, joinType: JoinType, condition: Option[Expression]) - (@transient sc: SparkContext) + (@transient sqlContext: SQLContext) extends BinaryNode { // TODO: Override requiredChildDistribution. override def outputPartitioning: Partitioning = streamed.outputPartitioning - override def otherCopyArgs = sc :: Nil + override def otherCopyArgs = sqlContext :: Nil def output = left.output ++ right.output @@ -286,7 +286,8 @@ case class BroadcastNestedLoopJoin( def execute() = { - val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) + val broadcastedRelation = + sqlContext.sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter => val matchedRows = new ArrayBuffer[Row] @@ -337,7 +338,7 @@ case class BroadcastNestedLoopJoin( } // TODO: Breaks lineage. - sc.union( - streamedPlusMatches.flatMap(_._1), sc.makeRDD(rightOuterMatches)) + sqlContext.sparkContext.union( + streamedPlusMatches.flatMap(_._1), sqlContext.sparkContext.makeRDD(rightOuterMatches)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 624f2e2fa13f6..ade823b51c9cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -33,10 +33,10 @@ import parquet.hadoop.util.ContextUtil import parquet.io.InvalidRecordException import parquet.schema.MessageType -import org.apache.spark.{Logging, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.{Logging, SerializableWritable, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} -import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} /** @@ -49,10 +49,11 @@ case class ParquetTableScan( output: Seq[Attribute], relation: ParquetRelation, columnPruningPred: Seq[Expression])( - @transient val sc: SparkContext) + @transient val sqlContext: SQLContext) extends LeafNode { override def execute(): RDD[Row] = { + val sc = sqlContext.sparkContext val job = new Job(sc.hadoopConfiguration) ParquetInputFormat.setReadSupportClass( job, @@ -93,7 +94,7 @@ case class ParquetTableScan( .filter(_ != null) // Parquet's record filters may produce null values } - override def otherCopyArgs = sc :: Nil + override def otherCopyArgs = sqlContext :: Nil /** * Applies a (candidate) projection. @@ -104,7 +105,7 @@ case class ParquetTableScan( def pruneColumns(prunedAttributes: Seq[Attribute]): ParquetTableScan = { val success = validateProjection(prunedAttributes) if (success) { - ParquetTableScan(prunedAttributes, relation, columnPruningPred)(sc) + ParquetTableScan(prunedAttributes, relation, columnPruningPred)(sqlContext) } else { sys.error("Warning: Could not validate Parquet schema projection in pruneColumns") this @@ -152,7 +153,7 @@ case class InsertIntoParquetTable( relation: ParquetRelation, child: SparkPlan, overwrite: Boolean = false)( - @transient val sc: SparkContext) + @transient val sqlContext: SQLContext) extends UnaryNode with SparkHadoopMapReduceUtil { /** @@ -168,7 +169,7 @@ case class InsertIntoParquetTable( val childRdd = child.execute() assert(childRdd != null) - val job = new Job(sc.hadoopConfiguration) + val job = new Job(sqlContext.sparkContext.hadoopConfiguration) val writeSupport = if (child.output.map(_.dataType).forall(_.isPrimitive)) { @@ -204,7 +205,7 @@ case class InsertIntoParquetTable( override def output = child.output - override def otherCopyArgs = sc :: Nil + override def otherCopyArgs = sqlContext :: Nil /** * Stores the given Row RDD as a Hadoop file. @@ -231,7 +232,7 @@ case class InsertIntoParquetTable( val wrappedConf = new SerializableWritable(job.getConfiguration) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) - val stageId = sc.newRddId() + val stageId = sqlContext.sparkContext.newRddId() val taskIdOffset = if (overwrite) { @@ -270,7 +271,7 @@ case class InsertIntoParquetTable( val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) - sc.runJob(rdd, writeShard _) + sqlContext.sparkContext.runJob(rdd, writeShard _) jobCommitter.commitJob(jobTaskContext) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 7714eb1b5628a..2ca0c1cdcbeca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -166,7 +166,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA val scanner = new ParquetTableScan( ParquetTestData.testData.output, ParquetTestData.testData, - Seq())(TestSQLContext.sparkContext) + Seq())(TestSQLContext) val projected = scanner.pruneColumns(ParquetTypesConverter .convertToAttributes(MessageTypeParser .parseMessageType(ParquetTestData.subTestSchema))) From ec935abce13b60f353236566da149c0c87bb1002 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 21 Jun 2014 12:04:18 -0700 Subject: [PATCH 4/5] [SQL] Break hiveOperators.scala into multiple files. The single file was getting very long (500+ loc). Author: Reynold Xin Closes #1166 from rxin/hiveOperators and squashes the following commits: 5b43068 [Reynold Xin] [SQL] Break hiveOperators.scala into multiple files. --- .../spark/sql/hive/HiveStrategies.scala | 9 +- .../execution/DescribeHiveTableCommand.scala | 88 +++ .../sql/hive/execution/HiveTableScan.scala | 223 ++++++++ .../hive/execution/InsertIntoHiveTable.scala | 248 +++++++++ .../sql/hive/execution/NativeCommand.scala | 47 ++ .../sql/hive/execution/hiveOperators.scala | 524 ------------------ 6 files changed, 610 insertions(+), 529 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index af7687b40429b..4d0fab4140b21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -64,7 +64,6 @@ private[hive] trait HiveStrategies { val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet val (pruningPredicates, otherPredicates) = predicates.partition { _.references.map(_.exprId).subsetOf(partitionKeyIds) - } pruneFilterProject( @@ -81,16 +80,16 @@ private[hive] trait HiveStrategies { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil - case describe: logical.DescribeCommand => { + + case describe: logical.DescribeCommand => val resolvedTable = context.executePlan(describe.table).analyzed resolvedTable match { case t: MetastoreRelation => - Seq(DescribeHiveTableCommand( - t, describe.output, describe.isExtended)(context)) + Seq(DescribeHiveTableCommand(t, describe.output, describe.isExtended)(context)) case o: LogicalPlan => Seq(DescribeCommand(planLater(o), describe.output)(context)) } - } + case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala new file mode 100644 index 0000000000000..a40e89e0d382b --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.hive.metastore.api.FieldSchema + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row} +import org.apache.spark.sql.execution.{Command, LeafNode} +import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} + +/** + * Implementation for "describe [extended] table". + * + * :: DeveloperApi :: + */ +@DeveloperApi +case class DescribeHiveTableCommand( + table: MetastoreRelation, + output: Seq[Attribute], + isExtended: Boolean)( + @transient context: HiveContext) + extends LeafNode with Command { + + // Strings with the format like Hive. It is used for result comparison in our unit tests. + lazy val hiveString: Seq[String] = { + val alignment = 20 + val delim = "\t" + + sideEffectResult.map { + case (name, dataType, comment) => + String.format("%-" + alignment + "s", name) + delim + + String.format("%-" + alignment + "s", dataType) + delim + + String.format("%-" + alignment + "s", Option(comment).getOrElse("None")) + } + } + + override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = { + // Trying to mimic the format of Hive's output. But not exactly the same. + var results: Seq[(String, String, String)] = Nil + + val columns: Seq[FieldSchema] = table.hiveQlTable.getCols + val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols + results ++= columns.map(field => (field.getName, field.getType, field.getComment)) + if (!partitionColumns.isEmpty) { + val partColumnInfo = + partitionColumns.map(field => (field.getName, field.getType, field.getComment)) + results ++= + partColumnInfo ++ + Seq(("# Partition Information", "", "")) ++ + Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++ + partColumnInfo + } + + if (isExtended) { + results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, "")) + } + + results + } + + override def execute(): RDD[Row] = { + val rows = sideEffectResult.map { + case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment)) + } + context.sparkContext.parallelize(rows, 1) + } + + override def otherCopyArgs = context :: Nil +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala new file mode 100644 index 0000000000000..ef8bae74530ec --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.primitive._ +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.hive._ +import org.apache.spark.util.MutablePair + +/** + * :: DeveloperApi :: + * The Hive table scan operator. Column and partition pruning are both handled. + * + * @param attributes Attributes to be fetched from the Hive table. + * @param relation The Hive table be be scanned. + * @param partitionPruningPred An optional partition pruning predicate for partitioned table. + */ +@DeveloperApi +case class HiveTableScan( + attributes: Seq[Attribute], + relation: MetastoreRelation, + partitionPruningPred: Option[Expression])( + @transient val context: HiveContext) + extends LeafNode + with HiveInspectors { + + require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, + "Partition pruning predicates only supported for partitioned tables.") + + // Bind all partition key attribute references in the partition pruning predicate for later + // evaluation. + private[this] val boundPruningPred = partitionPruningPred.map { pred => + require( + pred.dataType == BooleanType, + s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") + + BindReferences.bindReference(pred, relation.partitionKeys) + } + + @transient + private[this] val hadoopReader = new HadoopTableReader(relation.tableDesc, context) + + /** + * The hive object inspector for this table, which can be used to extract values from the + * serialized row representation. + */ + @transient + private[this] lazy val objectInspector = + relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector] + + /** + * Functions that extract the requested attributes from the hive output. Partitioned values are + * casted from string to its declared data type. + */ + @transient + protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = { + attributes.map { a => + val ordinal = relation.partitionKeys.indexOf(a) + if (ordinal >= 0) { + val dataType = relation.partitionKeys(ordinal).dataType + (_: Any, partitionKeys: Array[String]) => { + castFromString(partitionKeys(ordinal), dataType) + } + } else { + val ref = objectInspector.getAllStructFieldRefs + .find(_.getFieldName == a.name) + .getOrElse(sys.error(s"Can't find attribute $a")) + val fieldObjectInspector = ref.getFieldObjectInspector + + val unwrapHiveData = fieldObjectInspector match { + case _: HiveVarcharObjectInspector => + (value: Any) => value.asInstanceOf[HiveVarchar].getValue + case _: HiveDecimalObjectInspector => + (value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue()) + case _ => + identity[Any] _ + } + + (row: Any, _: Array[String]) => { + val data = objectInspector.getStructFieldData(row, ref) + val hiveData = unwrapData(data, fieldObjectInspector) + if (hiveData != null) unwrapHiveData(hiveData) else null + } + } + } + } + + private[this] def castFromString(value: String, dataType: DataType) = { + Cast(Literal(value), dataType).eval(null) + } + + private def addColumnMetadataToConf(hiveConf: HiveConf) { + // Specifies IDs and internal names of columns to be scanned. + val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer) + val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",") + + if (attributes.size == relation.output.size) { + ColumnProjectionUtils.setFullyReadColumns(hiveConf) + } else { + ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) + } + + ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + relation.tableDesc.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames) + } + + addColumnMetadataToConf(context.hiveconf) + + private def inputRdd = if (!relation.hiveQlTable.isPartitioned) { + hadoopReader.makeRDDForTable(relation.hiveQlTable) + } else { + hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions)) + } + + /** + * Prunes partitions not involve the query plan. + * + * @param partitions All partitions of the relation. + * @return Partitions that are involved in the query plan. + */ + private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { + boundPruningPred match { + case None => partitions + case Some(shouldKeep) => partitions.filter { part => + val dataTypes = relation.partitionKeys.map(_.dataType) + val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield { + castFromString(value, dataType) + } + + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. + val row = new GenericRow(castedValues.toArray) + shouldKeep.eval(row).asInstanceOf[Boolean] + } + } + } + + override def execute() = { + inputRdd.mapPartitions { iterator => + if (iterator.isEmpty) { + Iterator.empty + } else { + val mutableRow = new GenericMutableRow(attributes.length) + val mutablePair = new MutablePair[Any, Array[String]]() + val buffered = iterator.buffered + + // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern + // matching are avoided intentionally. + val rowsAndPartitionKeys = buffered.head match { + // With partition keys + case _: Array[Any] => + buffered.map { case array: Array[Any] => + val deserializedRow = array(0) + val partitionKeys = array(1).asInstanceOf[Array[String]] + mutablePair.update(deserializedRow, partitionKeys) + } + + // Without partition keys + case _ => + val emptyPartitionKeys = Array.empty[String] + buffered.map { deserializedRow => + mutablePair.update(deserializedRow, emptyPartitionKeys) + } + } + + rowsAndPartitionKeys.map { pair => + var i = 0 + while (i < attributes.length) { + mutableRow(i) = attributeFunctions(i)(pair._1, pair._2) + i += 1 + } + mutableRow: Row + } + } + } + } + + override def output = attributes +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala new file mode 100644 index 0000000000000..594a803806ede --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.metastore.MetaStoreUtils +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.serde2.Serializer +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} + +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} +import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class InsertIntoHiveTable( + table: MetastoreRelation, + partition: Map[String, Option[String]], + child: SparkPlan, + overwrite: Boolean) + (@transient sc: HiveContext) + extends UnaryNode { + + val outputClass = newSerializer(table.tableDesc).getSerializedClass + @transient private val hiveContext = new Context(sc.hiveconf) + @transient private val db = Hive.get(sc.hiveconf) + + private def newSerializer(tableDesc: TableDesc): Serializer = { + val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] + serializer.initialize(null, tableDesc.getProperties) + serializer + } + + override def otherCopyArgs = sc :: Nil + + def output = child.output + + /** + * Wraps with Hive types based on object inspector. + * TODO: Consolidate all hive OI/data interface code. + */ + protected def wrap(a: (Any, ObjectInspector)): Any = a match { + case (s: String, oi: JavaHiveVarcharObjectInspector) => + new HiveVarchar(s, s.size) + + case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) => + new HiveDecimal(bd.underlying()) + + case (row: Row, oi: StandardStructObjectInspector) => + val struct = oi.create() + row.zip(oi.getAllStructFieldRefs: Seq[StructField]).foreach { + case (data, field) => + oi.setStructFieldData(struct, field, wrap(data, field.getFieldObjectInspector)) + } + struct + + case (s: Seq[_], oi: ListObjectInspector) => + val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector)) + seqAsJavaList(wrappedSeq) + + case (obj, _) => + obj + } + + def saveAsHiveFile( + rdd: RDD[Writable], + valueClass: Class[_], + fileSinkConf: FileSinkDesc, + conf: JobConf, + isCompressed: Boolean) { + if (valueClass == null) { + throw new SparkException("Output value class not set") + } + conf.setOutputValueClass(valueClass) + if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { + throw new SparkException("Output format class not set") + } + // Doesn't work in Scala 2.9 due to what may be a generics bug + // TODO: Should we uncomment this for Scala 2.10? + // conf.setOutputFormat(outputFormatClass) + conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + if (isCompressed) { + // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", + // and "mapred.output.compression.type" have no impact on ORC because it uses table properties + // to store compression information. + conf.set("mapred.output.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(conf.get("mapred.output.compression.type")) + } + conf.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath( + conf, + SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf)) + + logger.debug("Saving as hadoop file of type " + valueClass.getSimpleName) + + val writer = new SparkHiveHadoopWriter(conf, fileSinkConf) + writer.preSetup() + + def writeToFile(context: TaskContext, iter: Iterator[Writable]) { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + + writer.setup(context.stageId, context.partitionId, attemptNumber) + writer.open() + + var count = 0 + while(iter.hasNext) { + val record = iter.next() + count += 1 + writer.write(record) + } + + writer.close() + writer.commit() + } + + sc.sparkContext.runJob(rdd, writeToFile _) + writer.commitJob() + } + + override def execute() = result + + /** + * Inserts all the rows in the table into Hive. Row objects are properly serialized with the + * `org.apache.hadoop.hive.serde2.SerDe` and the + * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. + * + * Note: this is run once and then kept to avoid double insertions. + */ + private lazy val result: RDD[Row] = { + val childRdd = child.execute() + assert(childRdd != null) + + // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer + // instances within the closure, since Serializer is not serializable while TableDesc is. + val tableDesc = table.tableDesc + val tableLocation = table.hiveQlTable.getDataLocation + val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) + val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) + val rdd = childRdd.mapPartitions { iter => + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val outputData = new Array[Any](fieldOIs.length) + iter.map { row => + var i = 0 + while (i < row.length) { + // Casts Strings to HiveVarchars when necessary. + outputData(i) = wrap(row(i), fieldOIs(i)) + i += 1 + } + + serializer.serialize(outputData, standardOI) + } + } + + // ORC stores compression information in table properties. While, there are other formats + // (e.g. RCFile) that rely on hadoop configurations to store compression information. + val jobConf = new JobConf(sc.hiveconf) + saveAsHiveFile( + rdd, + outputClass, + fileSinkConf, + jobConf, + sc.hiveconf.getBoolean("hive.exec.compress.output", false)) + + // TODO: Handle dynamic partitioning. + val outputPath = FileOutputFormat.getOutputPath(jobConf) + // Have to construct the format of dbname.tablename. + val qualifiedTableName = s"${table.databaseName}.${table.tableName}" + // TODO: Correctly set holdDDLTime. + // In most of the time, we should have holdDDLTime = false. + // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. + val holdDDLTime = false + if (partition.nonEmpty) { + val partitionSpec = partition.map { + case (key, Some(value)) => key -> value + case (key, None) => key -> "" // Should not reach here right now. + } + val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec) + db.validatePartitionNameCharacters(partVals) + // inheritTableSpecs is set to true. It should be set to false for a IMPORT query + // which is currently considered as a Hive native command. + val inheritTableSpecs = true + // TODO: Correctly set isSkewedStoreAsSubdir. + val isSkewedStoreAsSubdir = false + db.loadPartition( + outputPath, + qualifiedTableName, + partitionSpec, + overwrite, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir) + } else { + db.loadTable( + outputPath, + qualifiedTableName, + overwrite, + holdDDLTime) + } + + // It would be nice to just return the childRdd unchanged so insert operations could be chained, + // however for now we return an empty list to simplify compatibility checks with hive, which + // does not return anything for insert operations. + // TODO: implement hive compatibility as rules. + sc.sparkContext.makeRDD(Nil, 1) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala new file mode 100644 index 0000000000000..fe6031678f70f --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row} +import org.apache.spark.sql.execution.{Command, LeafNode} +import org.apache.spark.sql.hive.HiveContext + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class NativeCommand( + sql: String, output: Seq[Attribute])( + @transient context: HiveContext) + extends LeafNode with Command { + + override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql) + + override def execute(): RDD[Row] = { + if (sideEffectResult.size == 0) { + context.emptyResult + } else { + val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r))) + context.sparkContext.parallelize(rows, 1) + } + } + + override def otherCopyArgs = context :: Nil +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala deleted file mode 100644 index 2de2db28a7e04..0000000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala +++ /dev/null @@ -1,524 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.MetaStoreUtils -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.ql.Context -import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive} -import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils -import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc} -import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.objectinspector.primitive._ -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils -import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer} -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapred._ - -import org.apache.spark -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.hive._ -import org.apache.spark.util.MutablePair -import org.apache.spark.{TaskContext, SparkException} - -/* Implicits */ -import scala.collection.JavaConversions._ - -/** - * :: DeveloperApi :: - * The Hive table scan operator. Column and partition pruning are both handled. - * - * @param attributes Attributes to be fetched from the Hive table. - * @param relation The Hive table be be scanned. - * @param partitionPruningPred An optional partition pruning predicate for partitioned table. - */ -@DeveloperApi -case class HiveTableScan( - attributes: Seq[Attribute], - relation: MetastoreRelation, - partitionPruningPred: Option[Expression])( - @transient val context: HiveContext) - extends LeafNode - with HiveInspectors { - - require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, - "Partition pruning predicates only supported for partitioned tables.") - - // Bind all partition key attribute references in the partition pruning predicate for later - // evaluation. - private val boundPruningPred = partitionPruningPred.map { pred => - require( - pred.dataType == BooleanType, - s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") - - BindReferences.bindReference(pred, relation.partitionKeys) - } - - @transient - val hadoopReader = new HadoopTableReader(relation.tableDesc, context) - - /** - * The hive object inspector for this table, which can be used to extract values from the - * serialized row representation. - */ - @transient - lazy val objectInspector = - relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector] - - /** - * Functions that extract the requested attributes from the hive output. Partitioned values are - * casted from string to its declared data type. - */ - @transient - protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = { - attributes.map { a => - val ordinal = relation.partitionKeys.indexOf(a) - if (ordinal >= 0) { - val dataType = relation.partitionKeys(ordinal).dataType - (_: Any, partitionKeys: Array[String]) => { - castFromString(partitionKeys(ordinal), dataType) - } - } else { - val ref = objectInspector.getAllStructFieldRefs - .find(_.getFieldName == a.name) - .getOrElse(sys.error(s"Can't find attribute $a")) - val fieldObjectInspector = ref.getFieldObjectInspector - - val unwrapHiveData = fieldObjectInspector match { - case _: HiveVarcharObjectInspector => - (value: Any) => value.asInstanceOf[HiveVarchar].getValue - case _: HiveDecimalObjectInspector => - (value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue()) - case _ => - identity[Any] _ - } - - (row: Any, _: Array[String]) => { - val data = objectInspector.getStructFieldData(row, ref) - val hiveData = unwrapData(data, fieldObjectInspector) - if (hiveData != null) unwrapHiveData(hiveData) else null - } - } - } - } - - private def castFromString(value: String, dataType: DataType) = { - Cast(Literal(value), dataType).eval(null) - } - - private def addColumnMetadataToConf(hiveConf: HiveConf) { - // Specifies IDs and internal names of columns to be scanned. - val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer) - val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",") - - if (attributes.size == relation.output.size) { - ColumnProjectionUtils.setFullyReadColumns(hiveConf) - } else { - ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) - } - - ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) - - // Specifies types and object inspectors of columns to be scanned. - val structOI = ObjectInspectorUtils - .getStandardObjectInspector( - relation.tableDesc.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - val columnTypeNames = structOI - .getAllStructFieldRefs - .map(_.getFieldObjectInspector) - .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) - .mkString(",") - - hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) - hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames) - } - - addColumnMetadataToConf(context.hiveconf) - - @transient - def inputRdd = if (!relation.hiveQlTable.isPartitioned) { - hadoopReader.makeRDDForTable(relation.hiveQlTable) - } else { - hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions)) - } - - /** - * Prunes partitions not involve the query plan. - * - * @param partitions All partitions of the relation. - * @return Partitions that are involved in the query plan. - */ - private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { - boundPruningPred match { - case None => partitions - case Some(shouldKeep) => partitions.filter { part => - val dataTypes = relation.partitionKeys.map(_.dataType) - val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield { - castFromString(value, dataType) - } - - // Only partitioned values are needed here, since the predicate has already been bound to - // partition key attribute references. - val row = new GenericRow(castedValues.toArray) - shouldKeep.eval(row).asInstanceOf[Boolean] - } - } - } - - def execute() = { - inputRdd.mapPartitions { iterator => - if (iterator.isEmpty) { - Iterator.empty - } else { - val mutableRow = new GenericMutableRow(attributes.length) - val mutablePair = new MutablePair[Any, Array[String]]() - val buffered = iterator.buffered - - // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern - // matching are avoided intentionally. - val rowsAndPartitionKeys = buffered.head match { - // With partition keys - case _: Array[Any] => - buffered.map { case array: Array[Any] => - val deserializedRow = array(0) - val partitionKeys = array(1).asInstanceOf[Array[String]] - mutablePair.update(deserializedRow, partitionKeys) - } - - // Without partition keys - case _ => - val emptyPartitionKeys = Array.empty[String] - buffered.map { deserializedRow => - mutablePair.update(deserializedRow, emptyPartitionKeys) - } - } - - rowsAndPartitionKeys.map { pair => - var i = 0 - while (i < attributes.length) { - mutableRow(i) = attributeFunctions(i)(pair._1, pair._2) - i += 1 - } - mutableRow: Row - } - } - } - } - - def output = attributes -} - -/** - * :: DeveloperApi :: - */ -@DeveloperApi -case class InsertIntoHiveTable( - table: MetastoreRelation, - partition: Map[String, Option[String]], - child: SparkPlan, - overwrite: Boolean) - (@transient sc: HiveContext) - extends UnaryNode { - - val outputClass = newSerializer(table.tableDesc).getSerializedClass - @transient private val hiveContext = new Context(sc.hiveconf) - @transient private val db = Hive.get(sc.hiveconf) - - private def newSerializer(tableDesc: TableDesc): Serializer = { - val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] - serializer.initialize(null, tableDesc.getProperties) - serializer - } - - override def otherCopyArgs = sc :: Nil - - def output = child.output - - /** - * Wraps with Hive types based on object inspector. - * TODO: Consolidate all hive OI/data interface code. - */ - protected def wrap(a: (Any, ObjectInspector)): Any = a match { - case (s: String, oi: JavaHiveVarcharObjectInspector) => - new HiveVarchar(s, s.size) - - case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) => - new HiveDecimal(bd.underlying()) - - case (row: Row, oi: StandardStructObjectInspector) => - val struct = oi.create() - row.zip(oi.getAllStructFieldRefs: Seq[StructField]).foreach { - case (data, field) => - oi.setStructFieldData(struct, field, wrap(data, field.getFieldObjectInspector)) - } - struct - - case (s: Seq[_], oi: ListObjectInspector) => - val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector)) - seqAsJavaList(wrappedSeq) - - case (obj, _) => - obj - } - - def saveAsHiveFile( - rdd: RDD[Writable], - valueClass: Class[_], - fileSinkConf: FileSinkDesc, - conf: JobConf, - isCompressed: Boolean) { - if (valueClass == null) { - throw new SparkException("Output value class not set") - } - conf.setOutputValueClass(valueClass) - if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { - throw new SparkException("Output format class not set") - } - // Doesn't work in Scala 2.9 due to what may be a generics bug - // TODO: Should we uncomment this for Scala 2.10? - // conf.setOutputFormat(outputFormatClass) - conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) - if (isCompressed) { - // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", - // and "mapred.output.compression.type" have no impact on ORC because it uses table properties - // to store compression information. - conf.set("mapred.output.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(conf.get("mapred.output.compression.type")) - } - conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath( - conf, - SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf)) - - logger.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - - val writer = new SparkHiveHadoopWriter(conf, fileSinkConf) - writer.preSetup() - - def writeToFile(context: TaskContext, iter: Iterator[Writable]) { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - - writer.setup(context.stageId, context.partitionId, attemptNumber) - writer.open() - - var count = 0 - while(iter.hasNext) { - val record = iter.next() - count += 1 - writer.write(record) - } - - writer.close() - writer.commit() - } - - sc.sparkContext.runJob(rdd, writeToFile _) - writer.commitJob() - } - - override def execute() = result - - /** - * Inserts all the rows in the table into Hive. Row objects are properly serialized with the - * `org.apache.hadoop.hive.serde2.SerDe` and the - * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. - * - * Note: this is run once and then kept to avoid double insertions. - */ - private lazy val result: RDD[Row] = { - val childRdd = child.execute() - assert(childRdd != null) - - // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer - // instances within the closure, since Serializer is not serializable while TableDesc is. - val tableDesc = table.tableDesc - val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) - val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val rdd = childRdd.mapPartitions { iter => - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val outputData = new Array[Any](fieldOIs.length) - iter.map { row => - var i = 0 - while (i < row.length) { - // Casts Strings to HiveVarchars when necessary. - outputData(i) = wrap(row(i), fieldOIs(i)) - i += 1 - } - - serializer.serialize(outputData, standardOI) - } - } - - // ORC stores compression information in table properties. While, there are other formats - // (e.g. RCFile) that rely on hadoop configurations to store compression information. - val jobConf = new JobConf(sc.hiveconf) - saveAsHiveFile( - rdd, - outputClass, - fileSinkConf, - jobConf, - sc.hiveconf.getBoolean("hive.exec.compress.output", false)) - - // TODO: Handle dynamic partitioning. - val outputPath = FileOutputFormat.getOutputPath(jobConf) - // Have to construct the format of dbname.tablename. - val qualifiedTableName = s"${table.databaseName}.${table.tableName}" - // TODO: Correctly set holdDDLTime. - // In most of the time, we should have holdDDLTime = false. - // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. - val holdDDLTime = false - if (partition.nonEmpty) { - val partitionSpec = partition.map { - case (key, Some(value)) => key -> value - case (key, None) => key -> "" // Should not reach here right now. - } - val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec) - db.validatePartitionNameCharacters(partVals) - // inheritTableSpecs is set to true. It should be set to false for a IMPORT query - // which is currently considered as a Hive native command. - val inheritTableSpecs = true - // TODO: Correctly set isSkewedStoreAsSubdir. - val isSkewedStoreAsSubdir = false - db.loadPartition( - outputPath, - qualifiedTableName, - partitionSpec, - overwrite, - holdDDLTime, - inheritTableSpecs, - isSkewedStoreAsSubdir) - } else { - db.loadTable( - outputPath, - qualifiedTableName, - overwrite, - holdDDLTime) - } - - // It would be nice to just return the childRdd unchanged so insert operations could be chained, - // however for now we return an empty list to simplify compatibility checks with hive, which - // does not return anything for insert operations. - // TODO: implement hive compatibility as rules. - sc.sparkContext.makeRDD(Nil, 1) - } -} - -/** - * :: DeveloperApi :: - */ -@DeveloperApi -case class NativeCommand( - sql: String, output: Seq[Attribute])( - @transient context: HiveContext) - extends LeafNode with Command { - - override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql) - - override def execute(): RDD[spark.sql.Row] = { - if (sideEffectResult.size == 0) { - context.emptyResult - } else { - val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r))) - context.sparkContext.parallelize(rows, 1) - } - } - - override def otherCopyArgs = context :: Nil -} - -/** - * :: DeveloperApi :: - */ -@DeveloperApi -case class DescribeHiveTableCommand( - table: MetastoreRelation, - output: Seq[Attribute], - isExtended: Boolean)( - @transient context: HiveContext) - extends LeafNode with Command { - - // Strings with the format like Hive. It is used for result comparison in our unit tests. - lazy val hiveString: Seq[String] = { - val alignment = 20 - val delim = "\t" - - sideEffectResult.map { - case (name, dataType, comment) => - String.format("%-" + alignment + "s", name) + delim + - String.format("%-" + alignment + "s", dataType) + delim + - String.format("%-" + alignment + "s", Option(comment).getOrElse("None")) - } - } - - override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = { - // Trying to mimic the format of Hive's output. But not exactly the same. - var results: Seq[(String, String, String)] = Nil - - val columns: Seq[FieldSchema] = table.hiveQlTable.getCols - val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols - results ++= columns.map(field => (field.getName, field.getType, field.getComment)) - if (!partitionColumns.isEmpty) { - val partColumnInfo = - partitionColumns.map(field => (field.getName, field.getType, field.getComment)) - results ++= - partColumnInfo ++ - Seq(("# Partition Information", "", "")) ++ - Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++ - partColumnInfo - } - - if (isExtended) { - results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, "")) - } - - results - } - - override def execute(): RDD[Row] = { - val rows = sideEffectResult.map { - case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment)) - } - context.sparkContext.parallelize(rows, 1) - } - - override def otherCopyArgs = context :: Nil -} From 0a432d6a05d2a8f1f8f5e6e851f5174c98a64d1d Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 21 Jun 2014 13:02:49 -0700 Subject: [PATCH 5/5] HOTFIX: Fix missing MIMA ignore --- dev/mima | 1 + project/MimaExcludes.scala | 2 ++ 2 files changed, 3 insertions(+) diff --git a/dev/mima b/dev/mima index b68800d6d0173..7857294f61caf 100755 --- a/dev/mima +++ b/dev/mima @@ -18,6 +18,7 @@ # set -o pipefail +set -e # Go to the Spark project root directory FWDIR="$(cd `dirname $0`/..; pwd)" diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 042fdfcc47261..af620d61607fa 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,8 @@ object MimaExcludes { case v if v.startsWith("1.1") => Seq(MimaBuild.excludeSparkPackage("graphx")) ++ Seq( + // Adding new method to JavaRDLike trait - we should probably mark this as a developer API. + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"), // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values // for countApproxDistinct* functions, which does not work in Java. We later removed // them, and use the following to tell Mima to not care about them.