Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20254][SQL] Remove unnecessary data conversion for Dataset with primitive array #17568

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2230,8 +2230,8 @@ class Analyzer(
val result = resolved transformDown {
case UnresolvedMapObjects(func, inputData, cls) if inputData.resolved =>
inputData.dataType match {
case ArrayType(et, _) =>
val expr = MapObjects(func, inputData, et, cls) transformUp {
case ArrayType(et, cn) =>
val expr = MapObjects(func, inputData, et, cn, cls) transformUp {
case UnresolvedExtractValue(child, fieldName) if child.resolved =>
ExtractValue(child, fieldName, resolver)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,18 +451,21 @@ object MapObjects {
* @param function The function applied on the collection elements.
* @param inputData An expression that when evaluated returns a collection object.
* @param elementType The data type of elements in the collection.
* @param elementNullable When false, indicating elements in the collection are always
* non-null value.
* @param customCollectionCls Class of the resulting collection (returning ObjectType)
* or None (returning ArrayType)
*/
def apply(
function: Expression => Expression,
inputData: Expression,
elementType: DataType,
elementNullable: Boolean = true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @param?

customCollectionCls: Option[Class[_]] = None): MapObjects = {
val id = curId.getAndIncrement()
val loopValue = s"MapObjects_loopValue$id"
val loopIsNull = s"MapObjects_loopIsNull$id"
val loopVar = LambdaVariable(loopValue, loopIsNull, elementType)
val loopVar = LambdaVariable(loopValue, loopIsNull, elementType, elementNullable)
MapObjects(
loopValue, loopIsNull, elementType, function(loopVar), inputData, customCollectionCls)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf)
CostBasedJoinReorder(conf)) ::
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates(conf)) ::
Batch("Typed Filter Optimization", fixedPoint,
Batch("Object Expressions Optimization", fixedPoint,
EliminateMapObjects,
CombineTypedFilters) ::
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -368,6 +369,8 @@ case class NullPropagation(conf: SQLConf) extends Rule[LogicalPlan] {
case EqualNullSafe(Literal(null, _), r) => IsNull(r)
case EqualNullSafe(l, Literal(null, _)) => IsNull(l)

case AssertNotNull(c, _) if !c.nullable => c
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe to do? According to the description of AssertNotNull, even c is non-nullable, we still need to add this assertion for some cases.

Copy link
Member Author

@kiszk kiszk Apr 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is what @cloud-fan suggested in his comment.
Is there better alternative implementation to remove this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if @cloud-fan's no-op AssertNotNull is as the same as the case in AssertNotNull's description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch! sorry it was my mistake, but then seems we can not remove MapObjects, as the null check have to be done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I checked all the usage of AssertNotNull, we never use AssertNotNull to check a not nullable column/field, seems the document of AssertNotNull is wrong. Can you double check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @cloud-fan. I have also checked the usages of AssertNotNull. IIUC, all of them are used for throwing a runtime exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the purpose of AssertNotNull is used to give proper exception in runtime when an expression (note: it can be nullable or non-nullable expression) evaluates to null value.

Maybe for MapObjects, we can safely remove it. But I am not sure other cases it is okay too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as we trust the nullable property, I think it's safe to remove it. We don't use AssertNull to validate the input data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Sounds reasonable to me.


// For Coalesce, remove null literals.
case e @ Coalesce(children) =>
val newChildren = children.filterNot(isNullLiteral)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._

/*
* This file defines optimization rules related to object manipulation (for the Dataset API).
Expand Down Expand Up @@ -96,3 +98,22 @@ object CombineTypedFilters extends Rule[LogicalPlan] {
}
}
}

/**
* Removes MapObjects when the following conditions are satisfied
* 1. Mapobject(e) where e is lambdavariable(), which means types for input output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is obscure. Can we improve it a bit?

For example, MapObject(e) is confusing. Shall we clearly say the lambdaFunction of MapObject?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, done

* are primitive types
* 2. no custom collection class specified
* representation of data item. For example back to back map operations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rephrase this comment too? It looks weird.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, deleted

*/
object EliminateMapObjects extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we call plan.transformAllExpressions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it works. done

case DeserializeToObject(Invoke(
MapObjects(_, _, _, _ : LambdaVariable, inputData, None),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just replace MapObjects with its child? Seems the only reason you match the whole DeserializeToObject is to make sure the returnType is object type, but that's guaranteed if the collectionClass is None.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To replace MapObjects with its child is a type LogicalPlan => Expression while this method requires LogicalPlan => LogicalPlan.
Is it fine to replace Invoke(MapObject(..., inputData, None)...) with Invoke(inputData, ...)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, I misunderstood. Both Expression and Invoke are not LogicalPlan.
I think that we have to replace some of arguments in DeserializeToObject.
What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Unfortunately, this change caused two test failures.
"checkAnswer should compare map correctly" and "SPARK-18717: code generation works for both scala.collection.Map and scala.collection.imutable.Map" in DatasetSuite
I will check what's happen very soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we run the following code, the following exception occurs. This is because UnsafeArrayData.copy(), which is unsupported, is called.

    val ds = Seq((1, Map(2 -> 3))).toDS.map(t => t)
    ds.collect.toSeq

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution is to use the following matching:

  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
     case MapObjects(_, _, _, LambdaVariable(_, _, _, false), inputData, None) => inputData
  }

Previous one avoided the case such as Object[] -> Integer[]. Thus, the generated code incorrectly called UnsafeArrayData.array().

funcName, returnType: ObjectType, arguments, propagateNull, returnNullable),
outputObjAttr, child) =>
DeserializeToObject(Invoke(
inputData, funcName, returnType, arguments, propagateNull, returnNullable),
outputObjAttr, child)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{DeserializeToObject, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types._

class EliminateMapObjectsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = {
Batch("EliminateMapObjects", FixedPoint(50),
NullPropagation(conf),
SimplifyCasts,
EliminateMapObjects) :: Nil
}
}

implicit private def intArrayEncoder = ExpressionEncoder[Array[Int]]()
implicit private def doubleArrayEncoder = ExpressionEncoder[Array[Double]]()

test("SPARK-20254: Remove unnecessary data conversion for primitive array") {
val intObjType = ObjectType(classOf[Array[Int]])
val intInput = LocalRelation('a.array(ArrayType(IntegerType, false)))
val intQuery = intInput.deserialize[Array[Int]].analyze
val intOptimized = Optimize.execute(intQuery)
val intExpected = DeserializeToObject(
Invoke(intInput.output(0), "toIntArray", intObjType, Nil, true, false),
AttributeReference("obj", intObjType, true)(), intInput)
comparePlans(intOptimized, intExpected)

val doubleObjType = ObjectType(classOf[Array[Double]])
val doubleInput = LocalRelation('a.array(ArrayType(DoubleType, false)))
val doubleQuery = doubleInput.deserialize[Array[Double]].analyze
val doubleOptimized = Optimize.execute(doubleQuery)
val doubleExpected = DeserializeToObject(
Invoke(doubleInput.output(0), "toDoubleArray", doubleObjType, Nil, true, false),
AttributeReference("obj", doubleObjType, true)(), doubleInput)
comparePlans(doubleOptimized, doubleExpected)
}
}