From 8bebf24ad16f63034cb049c718e3d1b6070eea80 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 7 Oct 2014 15:51:07 -0700 Subject: [PATCH] commented out convertRowToScala for debugging --- .../spark/sql/catalyst/ScalaReflection.scala | 19 ++++++++++++------- .../spark/sql/catalyst/types/dataTypes.scala | 1 - .../org/apache/spark/sql/SQLContext.scala | 1 + .../org/apache/spark/sql/SchemaRDD.scala | 2 +- .../spark/sql/execution/SparkPlan.scala | 2 +- .../spark/sql/execution/basicOperators.scala | 4 ++-- 6 files changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index e5185c963becb..4d755c25880df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -37,7 +37,8 @@ object ScalaReflection { /** Converts Scala objects to catalyst rows / types */ def convertToCatalyst(a: Any, dataType: DataType): Any = (a, dataType) match { - case (o: Option[_], oType: _) => convertToCatalyst(o.orNull, oType) + // TODO: Why does this not need to flatMap stuff? Does it not support nesting? + case (o: Option[_], _) => o.map(convertToCatalyst(_, dataType)).orNull case (s: Seq[_], arrayType: ArrayType) => s.map(convertToCatalyst(_, arrayType.elementType)) case (m: Map[_, _], mapType: MapType) => m.map { case (k, v) => convertToCatalyst(k, mapType.keyType) -> convertToCatalyst(v, mapType.valueType) @@ -46,25 +47,29 @@ object ScalaReflection { p.productIterator.toSeq.zip(structType.fields).map { case (elem, field) => convertToCatalyst(elem, field.dataType) }.toArray) - case (udt: _, udtType: UDTType) => udtType. + case (udt: Any, udtType: UserDefinedType[_]) => udtType.serialize(udt) case (d: BigDecimal, _) => Decimal(d) case (other, _) => other } + /* /** Converts Catalyst types used internally in rows to standard Scala types */ def convertToScala(a: Any, dataType: DataType): Any = (a, dataType) match { - // TODO: USE DATATYPE + // TODO: Why does this not need to flatMap stuff? Does it not support nesting? // TODO: What about Option and Product? - case s: Seq[_] => s.map(convertToScala) - case m: Map[_, _] => m.map { case (k, v) => convertToScala(k) -> convertToScala(v) } + case (s: Seq[_], arrayType: ArrayType) => s.map(convertToScala(_, arrayType.elementType)) + case (m: Map[_, _], mapType: MapType) => m.map { case (k, v) => + convertToScala(k, mapType.keyType) -> convertToScala(v, mapType.valueType) + } case d: Decimal => d.toBigDecimal - case (udt: Any, udtType: UserDefinedType[_]) => udtType.serialize(udt) - case other => other + case (udt: Row, udtType: UserDefinedType[_]) => udtType.deserialize(udt) + case (other, _) => other } def convertRowToScala(r: Row, schema: StructType): Row = { new GenericRow(r.toArray.map(convertToScala(_, schema))) } + */ /** Returns a Sequence of attributes for the given case class type. */ def attributesFor[T: TypeTag]( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 80bd1c4d951cd..ff25d0b136eb7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -595,6 +595,5 @@ abstract class UserDefinedType[UserType](val dataType: StructType) extends DataT def deserialize(row: Row): UserType - // TODO def simpleString: String = "udt" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5a2aa9a3dee01..ad70eee42d598 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -295,6 +295,7 @@ class SQLContext(@transient val sparkContext: SparkContext) udt: UserDefinedType[UserType])(implicit userType: TypeTag[UserType]): Unit = { require(!udtRegistry.contains(userType), "registerUserType called on type which was already registered.") + // TODO: Check to see if type is built-in. Throw exception? udtRegistry(userType) = udt } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index f0cee63947721..e455ab5d33aa6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -114,7 +114,7 @@ class SchemaRDD( // ========================================================================================= override def compute(split: Partition, context: TaskContext): Iterator[Row] = - firstParent[Row].compute(split, context).map(ScalaReflection.convertRowToScala(_, this.schema)) + firstParent[Row].compute(split, context).map(_.copy) //(ScalaReflection.convertRowToScala(_, this.schema)) override def getPartitions: Array[Partition] = firstParent[Row].partitions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 21967b14617c0..40286eeec8274 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -83,7 +83,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * Runs this query returning the result as an array. */ def executeCollect(): Array[Row] = { - execute().map(ScalaReflection.convertRowToScala(_, schema)).collect() + execute().map(_.copy).collect() //(ScalaReflection.convertRowToScala(_, schema)).collect() } protected def newProjection( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 1b8ba3ace2a82..35378f9ef92da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -143,7 +143,7 @@ case class Limit(limit: Int, child: SparkPlan) partsScanned += numPartsToTry } - buf.toArray.map(ScalaReflection.convertRowToScala(_, this.schema)) + buf.toArray//.map(ScalaReflection.convertRowToScala(_, this.schema)) } override def execute() = { @@ -180,7 +180,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) // TODO: Is this copying for no reason? override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ord) - .map(ScalaReflection.convertRowToScala(_, this.schema)) + //.map(ScalaReflection.convertRowToScala(_, this.schema)) // TODO: Terminal split should be implemented differently from non-terminal split. // TODO: Pick num splits based on |limit|.