Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20254][SQL] Remove unnecessary data conversion for Dataset with primitive array #17568

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2230,8 +2230,8 @@ class Analyzer(
val result = resolved transformDown {
case UnresolvedMapObjects(func, inputData, cls) if inputData.resolved =>
inputData.dataType match {
case ArrayType(et, _) =>
val expr = MapObjects(func, inputData, et, cls) transformUp {
case ArrayType(et, cn) =>
val expr = MapObjects(func, inputData, et, cn, cls) transformUp {
case UnresolvedExtractValue(child, fieldName) if child.resolved =>
ExtractValue(child, fieldName, resolver)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,18 +451,21 @@ object MapObjects {
* @param function The function applied on the collection elements.
* @param inputData An expression that when evaluated returns a collection object.
* @param elementType The data type of elements in the collection.
* @param elementNullable When false, indicating elements in the collection are always
* non-null value.
* @param customCollectionCls Class of the resulting collection (returning ObjectType)
* or None (returning ArrayType)
*/
def apply(
function: Expression => Expression,
inputData: Expression,
elementType: DataType,
elementNullable: Boolean = true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @param?

customCollectionCls: Option[Class[_]] = None): MapObjects = {
val id = curId.getAndIncrement()
val loopValue = s"MapObjects_loopValue$id"
val loopIsNull = s"MapObjects_loopIsNull$id"
val loopVar = LambdaVariable(loopValue, loopIsNull, elementType)
val loopVar = LambdaVariable(loopValue, loopIsNull, elementType, elementNullable)
MapObjects(
loopValue, loopIsNull, elementType, function(loopVar), inputData, customCollectionCls)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf)
CostBasedJoinReorder(conf)) ::
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates(conf)) ::
Batch("Typed Filter Optimization", fixedPoint,
Batch("Object Expressions Optimization", fixedPoint,
EliminateMapObjects,
CombineTypedFilters) ::
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -368,6 +369,8 @@ case class NullPropagation(conf: SQLConf) extends Rule[LogicalPlan] {
case EqualNullSafe(Literal(null, _), r) => IsNull(r)
case EqualNullSafe(l, Literal(null, _)) => IsNull(l)

case AssertNotNull(c, _) if !c.nullable => c
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe to do? According to the description of AssertNotNull, even c is non-nullable, we still need to add this assertion for some cases.

Copy link
Member Author

@kiszk kiszk Apr 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is what @cloud-fan suggested in his comment.
Is there better alternative implementation to remove this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if @cloud-fan's no-op AssertNotNull is as the same as the case in AssertNotNull's description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch! sorry it was my mistake, but then seems we can not remove MapObjects, as the null check have to be done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I checked all the usage of AssertNotNull, we never use AssertNotNull to check a not nullable column/field, seems the document of AssertNotNull is wrong. Can you double check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @cloud-fan. I have also checked the usages of AssertNotNull. IIUC, all of them are used for throwing a runtime exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the purpose of AssertNotNull is used to give proper exception in runtime when an expression (note: it can be nullable or non-nullable expression) evaluates to null value.

Maybe for MapObjects, we can safely remove it. But I am not sure other cases it is okay too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as we trust the nullable property, I think it's safe to remove it. We don't use AssertNull to validate the input data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Sounds reasonable to me.


// For Coalesce, remove null literals.
case e @ Coalesce(children) =>
val newChildren = children.filterNot(isNullLiteral)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._

/*
* This file defines optimization rules related to object manipulation (for the Dataset API).
Expand Down Expand Up @@ -96,3 +98,30 @@ object CombineTypedFilters extends Rule[LogicalPlan] {
}
}
}

/**
* Removes MapObjects when the following conditions are satisfied
* 1. Mapobject(e) where e is lambdavariable(), which means types for input output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is obscure. Can we improve it a bit?

For example, MapObject(e) is confusing. Shall we clearly say the lambdaFunction of MapObject?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, done

* are primitive types
* 2. no custom collection class specified
* representation of data item. For example back to back map operations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment broken?

*/
object EliminateMapObjects extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we call plan.transformAllExpressions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it works. done

case _ @ DeserializeToObject(Invoke(
MapObjects(_, _, _, Cast(LambdaVariable(_, _, dataType, _), castDataType, _),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Cast should be eliminated when running this rule?

Copy link
Member Author

@kiszk kiszk Apr 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I eliminate Cast in this rule, too. This Cast seems to be added very recently by a recent commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, do you mean

  1. Cast should be eliminated by this rule?; or
  2. Cast should be eliminated before applying this rule?

inputData, None),
funcName, returnType: ObjectType, arguments, propagateNull, returnNullable),
outputObjAttr, child) if dataType == castDataType =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this case. Cast has been removed by SimplifyCasts .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, as you pointed out, Cast has been removed by SimplifyCasts.
I leave this for robustness. In the future, this optimization will be executed before SimplifyCasts by reordering.
What do you think? cc: @cloud-fan

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order does not matter. The batch will be run multiple times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see

DeserializeToObject(Invoke(
inputData, funcName, returnType, arguments, propagateNull, returnNullable),
outputObjAttr, child)
case _ @ DeserializeToObject(Invoke(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: _ @ DeserializeToObject -> DeserializeToObject

MapObjects(_, _, _, LambdaVariable(_, _, dataType, _), inputData, None),
Copy link
Member

@gatorsmile gatorsmile Apr 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataType is not being used. Basically, this rule is to get rid of MapObjects when no function is applied to LambdaVariable. Do we still need to check whether customCollectionCls is equal to None? Any other scenario besides type casting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @cloud-fan pointed out in this comment , it is necessary. customCollectionCls is introduced by #16541.
This is not equal to None when Seq() is used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, for safety, we can keep it.

funcName, returnType: ObjectType, arguments, propagateNull, returnNullable),
outputObjAttr, child) =>
DeserializeToObject(Invoke(
inputData, funcName, returnType, arguments, propagateNull, returnNullable),
outputObjAttr, child)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{DeserializeToObject, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types._

class EliminateMapObjectsSuite extends PlanTest {
class Optimize(addSimplifyCast: Boolean) extends RuleExecutor[LogicalPlan] {
val batches = if (addSimplifyCast) {
Batch("EliminateMapObjects", FixedPoint(50),
NullPropagation(conf),
SimplifyCasts,
EliminateMapObjects) :: Nil
} else {
Batch("EliminateMapObjects", FixedPoint(50),
NullPropagation(conf),
EliminateMapObjects) :: Nil
}
}

implicit private def intArrayEncoder = ExpressionEncoder[Array[Int]]()
implicit private def doubleArrayEncoder = ExpressionEncoder[Array[Double]]()

test("SPARK-20254: Remove unnecessary data conversion for primitive array") {
val intObjType = ObjectType(classOf[Array[Int]])
val intInput = LocalRelation('a.array(ArrayType(IntegerType, false)))
val intQuery = intInput.deserialize[Array[Int]].analyze
Seq(true, false).foreach { addSimplifyCast =>
val intOptimized = new Optimize(addSimplifyCast).execute(intQuery)
val intExpected = DeserializeToObject(
Invoke(intInput.output(0), "toIntArray", intObjType, Nil, true, false),
AttributeReference("obj", intObjType, true)(), intInput)
comparePlans(intOptimized, intExpected)
}

val doubleObjType = ObjectType(classOf[Array[Double]])
val doubleInput = LocalRelation('a.array(ArrayType(DoubleType, false)))
val doubleQuery = doubleInput.deserialize[Array[Double]].analyze
Seq(true, false).foreach { addSimplifyCast =>
val doubleOptimized = new Optimize(addSimplifyCast).execute(doubleQuery)
val doubleExpected = DeserializeToObject(
Invoke(doubleInput.output(0), "toDoubleArray", doubleObjType, Nil, true, false),
AttributeReference("obj", doubleObjType, true)(), doubleInput)
comparePlans(doubleOptimized, doubleExpected)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql
import scala.collection.immutable.Queue
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.execution.DeserializeToObjectExec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert the above two lines?

import org.apache.spark.sql.test.SharedSQLContext

case class IntClass(value: Int)
Expand Down