Skip to content

Commit

Permalink
commented out convertRowToScala for debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
jkbradley committed Nov 2, 2014
1 parent 53de70f commit 8bebf24
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ object ScalaReflection {

/** Converts Scala objects to catalyst rows / types */
def convertToCatalyst(a: Any, dataType: DataType): Any = (a, dataType) match {
case (o: Option[_], oType: _) => convertToCatalyst(o.orNull, oType)
// TODO: Why does this not need to flatMap stuff? Does it not support nesting?
case (o: Option[_], _) => o.map(convertToCatalyst(_, dataType)).orNull
case (s: Seq[_], arrayType: ArrayType) => s.map(convertToCatalyst(_, arrayType.elementType))
case (m: Map[_, _], mapType: MapType) => m.map { case (k, v) =>
convertToCatalyst(k, mapType.keyType) -> convertToCatalyst(v, mapType.valueType)
Expand All @@ -46,25 +47,29 @@ object ScalaReflection {
p.productIterator.toSeq.zip(structType.fields).map { case (elem, field) =>
convertToCatalyst(elem, field.dataType)
}.toArray)
case (udt: _, udtType: UDTType) => udtType.
case (udt: Any, udtType: UserDefinedType[_]) => udtType.serialize(udt)
case (d: BigDecimal, _) => Decimal(d)
case (other, _) => other
}

/*
/** Converts Catalyst types used internally in rows to standard Scala types */
def convertToScala(a: Any, dataType: DataType): Any = (a, dataType) match {
// TODO: USE DATATYPE
// TODO: Why does this not need to flatMap stuff? Does it not support nesting?
// TODO: What about Option and Product?
case s: Seq[_] => s.map(convertToScala)
case m: Map[_, _] => m.map { case (k, v) => convertToScala(k) -> convertToScala(v) }
case (s: Seq[_], arrayType: ArrayType) => s.map(convertToScala(_, arrayType.elementType))
case (m: Map[_, _], mapType: MapType) => m.map { case (k, v) =>
convertToScala(k, mapType.keyType) -> convertToScala(v, mapType.valueType)
}
case d: Decimal => d.toBigDecimal
case (udt: Any, udtType: UserDefinedType[_]) => udtType.serialize(udt)
case other => other
case (udt: Row, udtType: UserDefinedType[_]) => udtType.deserialize(udt)
case (other, _) => other
}
def convertRowToScala(r: Row, schema: StructType): Row = {
new GenericRow(r.toArray.map(convertToScala(_, schema)))
}
*/

/** Returns a Sequence of attributes for the given case class type. */
def attributesFor[T: TypeTag](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,5 @@ abstract class UserDefinedType[UserType](val dataType: StructType) extends DataT

def deserialize(row: Row): UserType

// TODO
def simpleString: String = "udt"
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
udt: UserDefinedType[UserType])(implicit userType: TypeTag[UserType]): Unit = {
require(!udtRegistry.contains(userType),
"registerUserType called on type which was already registered.")
// TODO: Check to see if type is built-in. Throw exception?
udtRegistry(userType) = udt
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class SchemaRDD(
// =========================================================================================

override def compute(split: Partition, context: TaskContext): Iterator[Row] =
firstParent[Row].compute(split, context).map(ScalaReflection.convertRowToScala(_, this.schema))
firstParent[Row].compute(split, context).map(_.copy) //(ScalaReflection.convertRowToScala(_, this.schema))

override def getPartitions: Array[Partition] = firstParent[Row].partitions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* Runs this query returning the result as an array.
*/
def executeCollect(): Array[Row] = {
execute().map(ScalaReflection.convertRowToScala(_, schema)).collect()
execute().map(_.copy).collect() //(ScalaReflection.convertRowToScala(_, schema)).collect()
}

protected def newProjection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ case class Limit(limit: Int, child: SparkPlan)
partsScanned += numPartsToTry
}

buf.toArray.map(ScalaReflection.convertRowToScala(_, this.schema))
buf.toArray//.map(ScalaReflection.convertRowToScala(_, this.schema))
}

override def execute() = {
Expand Down Expand Up @@ -180,7 +180,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)

// TODO: Is this copying for no reason?
override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ord)
.map(ScalaReflection.convertRowToScala(_, this.schema))
//.map(ScalaReflection.convertRowToScala(_, this.schema))

// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
Expand Down

0 comments on commit 8bebf24

Please sign in to comment.