Skip to content

Commit

Permalink
CatalystToExternalMap should support interpreted execution
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Apr 19, 2018
1 parent 0c94e48 commit ad6762a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import scala.util.Try
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.serializer._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.ScalaReflection.universe.TermName
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -1009,8 +1009,20 @@ case class CatalystToExternalMap private(
override def children: Seq[Expression] =
keyLambdaFunction :: valueLambdaFunction :: inputData :: Nil

override def eval(input: InternalRow): Any =
throw new UnsupportedOperationException("Only code-generated evaluation is supported")
private lazy val toScalaValue: Any => Any = {
assert(inputData.dataType.isInstanceOf[MapType])
val mapType = inputData.dataType.asInstanceOf[MapType]
CatalystTypeConverters.createToScalaConverter(mapType)
}

override def eval(input: InternalRow): Any = {
val result = inputData.eval(input).asInstanceOf[MapData]
if (result != null) {
toScalaValue(result)
} else {
null
}
}

override def dataType: DataType = ObjectType(collClass)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer, UnresolvedDeserializer}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{SQLDate, SQLTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -148,9 +150,10 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
"fromPrimitiveArray", ObjectType(classOf[Array[Int]]),
Array[Int](1, 2, 3), UnsafeArrayData.fromPrimitiveArray(Array[Int](1, 2, 3))),
(DateTimeUtils.getClass, ObjectType(classOf[Date]),
"toJavaDate", ObjectType(classOf[SQLDate]), 77777, DateTimeUtils.toJavaDate(77777)),
"toJavaDate", ObjectType(classOf[DateTimeUtils.SQLDate]), 77777,
DateTimeUtils.toJavaDate(77777)),
(DateTimeUtils.getClass, ObjectType(classOf[Timestamp]),
"toJavaTimestamp", ObjectType(classOf[SQLTimestamp]),
"toJavaTimestamp", ObjectType(classOf[DateTimeUtils.SQLTimestamp]),
88888888.toLong, DateTimeUtils.toJavaTimestamp(88888888))
).foreach { case (cls, dataType, methodName, argType, arg, expected) =>
checkObjectExprEvaluation(StaticInvoke(cls, dataType, methodName,
Expand Down Expand Up @@ -383,6 +386,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

<<<<<<< 0c94e48bc50717e1627c0d2acd5382d9adc73c97
test("LambdaVariable should support interpreted execution") {
def genSchema(dt: DataType): Seq[StructType] = {
Seq(StructType(StructField("col_1", dt, nullable = false) :: Nil),
Expand Down Expand Up @@ -415,6 +419,25 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}
}

implicit private def mapIntStrEncoder = ExpressionEncoder[Map[Int, String]]()

test("SPARK-23588 CatalystToExternalMap should support interpreted execution") {
// To get a resolved `CatalystToExternalMap` expression, we build a deserializer plan
// with dummy input, resolve the plan by the analyzer, and replace the dummy input
// with a literal for tests.
val unresolvedDeser = UnresolvedDeserializer(encoderFor[Map[Int, String]].deserializer)
val dummyInputPlan = LocalRelation('value.map(MapType(IntegerType, StringType)))
val plan = Project(Alias(unresolvedDeser, "none")() :: Nil, dummyInputPlan)

val analyzedPlan = SimpleAnalyzer.execute(plan)
val Alias(toMapExpr: CatalystToExternalMap, _) = analyzedPlan.expressions.head

// Replaces the dummy input with a literal for tests here
val data = Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3")
val deserializer = toMapExpr.copy(inputData = Literal.create(data))
checkObjectExprEvaluation(deserializer, expected = data)
}
}

class TestBean extends Serializable {
Expand Down

0 comments on commit ad6762a

Please sign in to comment.