Skip to content

Commit

Permalink
Allow schema calculation to be lazy, but ensure its available on exec…
Browse files Browse the repository at this point in the history
…utors.
  • Loading branch information
marmbrus authored and jkbradley committed Nov 2, 2014
1 parent dff99d6 commit 85872f6
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
16 changes: 10 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,18 @@ class SchemaRDD(

override def getPartitions: Array[Partition] = firstParent[Row].partitions

override protected def getDependencies: Seq[Dependency[_]] =
override protected def getDependencies: Seq[Dependency[_]] = {
schema // Force reification of the schema so it is available on executors.

List(new OneToOneDependency(queryExecution.toRdd))
}

/** Returns the schema of this SchemaRDD (represented by a [[StructType]]).
*
* @group schema
*/
val schema: StructType = queryExecution.analyzed.schema
/**
* Returns the schema of this SchemaRDD (represented by a [[StructType]]).
*
* @group schema
*/
lazy val schema: StructType = queryExecution.analyzed.schema

// =======================================================================
// Query DSL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/** Extends QueryExecution with hive specific features. */
protected[sql] abstract class QueryExecution extends super.QueryExecution {

override lazy val toRdd: RDD[Row] = {
val schema = StructType.fromAttributes(logical.output)
executedPlan.execute().map(ScalaReflection.convertRowToScala(_, schema))
}

protected val primitiveTypes =
Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
ShortType, DateType, TimestampType, BinaryType)
Expand Down Expand Up @@ -433,7 +428,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
command.executeCollect().map(_.head.toString)

case other =>
val result: Seq[Seq[Any]] = toRdd.collect().toSeq
val result: Seq[Seq[Any]] = toRdd.map(_.copy()).collect().toSeq
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
Expand Down

0 comments on commit 85872f6

Please sign in to comment.