diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 1e792768b7c96..760348052739c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -23,20 +23,31 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.ObjectType /** - * A trait for logical operators that operate on user defined objects. + * A trait for logical operators that apply user defined functions to domain objects. */ trait ObjectOperator extends LogicalPlan { + + /** The serializer that is used to produce the output of this operator. */ def serializer: Seq[NamedExpression] + /** + * The object type that is produced by the user defined function. Note that the return type here + * is the same whether or not the operator is output serialized data. + */ def outputObject: NamedExpression = Alias(serializer.head.collect { case b: BoundReference => b }.head, "obj")() + /** + * Returns a copy of this operator that will produce an object instead of an encoded row. + * Used in the optimizer when transforming plans to remove unneeded serialization. + */ def withObjectOutput: LogicalPlan = if (output.head.dataType.isInstanceOf[ObjectType]) { this } else { withNewSerializer(outputObject) } + /** Returns a copy of this operator with a different serializer. */ def withNewSerializer(newSerializer: NamedExpression): LogicalPlan = makeCopy { productIterator.map { case c if c == serializer => newSerializer :: Nil @@ -85,8 +96,9 @@ object AppendColumns { /** * A relation produced by applying `func` to each partition of the `child`, concatenating the - * resulting columns at the end of the input row. tEncoder/uEncoder are used respectively to - * decode/encode from the JVM object representation expected by `func.` + * resulting columns at the end of the input row. + * @param input used to extract the input to `func` from an input row. + * @param serializer use to serialize the output of `func`. */ case class AppendColumns( func: Any => Any, @@ -117,6 +129,9 @@ object MapGroups { * Applies func to each unique group in `child`, based on the evaluation of `groupingAttributes`. * Func is invoked with an object representation of the grouping key an iterator containing the * object representation of all the rows with that key. + * @param keyObject used to extract the key object for each group. + * @param input used to extract the items in the iterator from an input row. + * @param serializer use to serialize the output of `func`. */ case class MapGroups( func: (Any, Iterator[Any]) => TraversableOnce[Any],