Skip to content

Commit

Permalink
Comments and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 17, 2015
1 parent 024d569 commit 7d3222c
Showing 1 changed file with 41 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.lang.{Iterable => JavaIterable}
import java.math.{BigDecimal => JavaBigDecimal}
import java.sql.Date
import java.util.{Map => JavaMap}
import javax.annotation.Nullable

import scala.collection.mutable.HashMap

Expand All @@ -38,7 +39,6 @@ object CatalystTypeConverters {

private def getConverterForType(dataType: DataType): CatalystTypeConverter[Any, Any] = {
val converter = dataType match {
// Check UDT first since UDTs can override other types
case udt: UserDefinedType[_] => UDTConverter(udt)
case arrayType: ArrayType => ArrayConverter(arrayType.elementType)
case mapType: MapType => MapConverter(mapType.keyType, mapType.valueType)
Expand All @@ -58,26 +58,58 @@ object CatalystTypeConverters {
converter.asInstanceOf[CatalystTypeConverter[Any, Any]]
}

/**
* Converts a Scala type to its Catalyst equivalent (and vice versa).
*/
private abstract class CatalystTypeConverter[ScalaType, CatalystType] extends Serializable {

final def toCatalyst(maybeScalaValue: Any): Any = {
/**
* Converts a Scala type to its Catalyst equivalent while automatically handling nulls
* and Options.
*/
final def toCatalyst(@Nullable maybeScalaValue: Any): CatalystType = {
maybeScalaValue match {
case None => null
case null => null
case Some(scalaValue: ScalaType) => toCatalystImpl(scalaValue)
case opt: Option[ScalaType] =>
if (opt.isDefined) {
toCatalystImpl(opt.get)
} else {
null.asInstanceOf[CatalystType]
}
case null => null.asInstanceOf[CatalystType]
case scalaValue: ScalaType => toCatalystImpl(scalaValue)
}
}

/**
* Given a Catalyst row, convert the value at column `column` to its Scala equivalent.
*/
final def toScala(row: Row, column: Int): Any = {
if (row.isNullAt(column)) null else toScalaImpl(row, column)
}

def toScala(catalystValue: CatalystType): ScalaType
/**
* Convert a Catalyst value to its Scala equivalent.
*/
def toScala(@Nullable catalystValue: CatalystType): ScalaType

/**
* Converts a Scala value to its Catalyst equivalent.
* @param scalaValue the Scala value, guaranteed not to be null.
* @return the Catalyst value.
*/
protected def toCatalystImpl(scalaValue: ScalaType): CatalystType

/**
* Given a Catalyst row, convert the value at column `column` to its Scala equivalent.
* This method will only be called on non-null columns.
*/
protected def toScalaImpl(row: Row, column: Int): ScalaType
}

/**
* Convenience wrapper to write type converters for primitives. We use a converter for primitives
* so that we can use type-specific field accessors when converting Catalyst rows to Scala rows.
*/
private abstract class PrimitiveCatalystTypeConverter[T] extends CatalystTypeConverter[T, T] {
override final def toScala(catalystValue: T): T = catalystValue
override final def toCatalystImpl(scalaValue: T): T = scalaValue
Expand Down Expand Up @@ -278,16 +310,11 @@ object CatalystTypeConverters {
* This ordering is important for UDT registration.
*/
def convertToCatalyst(scalaValue: Any, dataType: DataType): Any = {
// Check UDT first since UDTs can override other types
dataType match {
case udt: UserDefinedType[_] => udt.serialize(scalaValue)
case option: Option[_] => option.map(convertToCatalyst(_, dataType)).orNull
case _ => getConverterForType(dataType).toCatalyst(scalaValue)
}
getConverterForType(dataType).toCatalyst(scalaValue)
}

/**
* Creates a converter function that will convert Scala objects to the specified catalyst type.
* Creates a converter function that will convert Scala objects to the specified Catalyst type.
* Typical use case would be converting a collection of rows that have the same schema. You will
* call this function once to get a converter, and apply it to every row.
*/
Expand All @@ -296,7 +323,7 @@ object CatalystTypeConverters {
}

/**
* Converts Scala objects to catalyst rows / types.
* Converts Scala objects to Catalyst rows / types.
*
* Note: This should be called before do evaluation on Row
* (It does not support UDT)
Expand Down

0 comments on commit 7d3222c

Please sign in to comment.