-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20254][SQL] Remove unnecessary data conversion for Dataset with primitive array #17568
Conversation
Test build #75610 has finished for PR 17568 at commit
|
@cloud-fan could you please review this? |
@@ -2230,6 +2230,8 @@ class Analyzer( | |||
val result = resolved transformDown { | |||
case UnresolvedMapObjects(func, inputData, cls) if inputData.resolved => | |||
inputData.dataType match { | |||
case ArrayType(et, false) if cls.isEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it really safe to do so? The MapObject
is not only used for null checking, but also to resolve struct in array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be safe, we should check:
- no custom collection class specified
- the
function
will convert an expressione
toAssertNotNull(e)
(this guarantees we are expecting a primitive array) - the
inputData
is of type array and its element is not nullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. Now, I missed 2. Am I correct?
- Already did by
cls.isEmpty
- We have to check whether
et
is primitive type or not at line 2233. - Already did by
ArrayType(et, false)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea
Test build #75624 has finished for PR 17568 at commit
|
@cloud-fan how about this check for 2.? |
ping @cloud-fan |
@@ -2230,6 +2230,8 @@ class Analyzer( | |||
val result = resolved transformDown { | |||
case UnresolvedMapObjects(func, inputData, cls) if inputData.resolved => | |||
inputData.dataType match { | |||
case ArrayType(et, false) if cls.isEmpty && | |||
CatalystTypeConverters.isPrimitive(et) => Cast(inputData, inputData.dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the expected array element type is same as the actual element array type, and the cutomer collection class is empty, then we can say that this MapObjects
is a no-op and we can remove it.
The information of expected array element type is actually hidden in the map function, so I think it's hard to do the check here. How about we add an optimizer rule that if MapObjects.lambdaFunction
is LambdaVariable
, we eliminate this MapObjects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, the et
here is the actual array element type, not the expected one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I will work for adding this optimizer rule.
Test build #75669 has finished for PR 17568 at commit
|
@cloud-fan I would appreciate it if you have time to look at this. |
@cloud-fan we would appreciate it if we merge this into Spark 2.2. |
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case _ @ DeserializeToObject(_ @ Invoke( | ||
MapObjects(_, _, inputType, args, inputData, customCollectionCls, _), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just match MapObjects
, as it's quite self-contained, we don't need other information to decide if we can remove MapObjects
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done.
/** | ||
* Removes MapObjects when the following conditions are satisfied | ||
* 1. Mapobject(e) where e is lambdavariable | ||
* 2. the function will convert an expression MapObjects(e) to AssertNotNull(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have AssertNotNull
, then it's not a no-op and we can not eliminate this MapObjects
. We may need to update NullPropagation
to eliminate no-op AssertNotNull
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done, too
* Removes MapObjects when the following conditions are satisfied | ||
* 1. Mapobject(e) where e is lambdavariable | ||
* 2. the function will convert an expression MapObjects(e) to AssertNotNull(e) | ||
* 3. the inputData is of primitive type array and its element is not nullable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type doesn't matter, we just need to check if the map function is a no-op. If the type is not primitive, e.g. String, Timestamp, etc. the map function will be Invoke
Test build #75804 has finished for PR 17568 at commit
|
@@ -368,6 +369,8 @@ case class NullPropagation(conf: SQLConf) extends Rule[LogicalPlan] { | |||
case EqualNullSafe(Literal(null, _), r) => IsNull(r) | |||
case EqualNullSafe(l, Literal(null, _)) => IsNull(l) | |||
|
|||
case a @ AssertNotNull(c, _) if !c.nullable => c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a
is not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good cattch. done
} | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case _ @ DeserializeToObject(_ @ Invoke( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case MapObjects(_, _, _, _: LambdaVariable, inputData, None, _) => inputData
can this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can do.
@@ -253,4 +256,27 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { | |||
checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1)) | |||
} | |||
|
|||
test("SPARK-20254: Remove unnecessary data conversion for primitive array") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually don't test optimizer in end-to-end test, see TypedFilterOptimizationSuite
as an example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing it out. I implemented non-e2e tests.
Test build #75807 has finished for PR 17568 at commit
|
Test build #75814 has finished for PR 17568 at commit
|
Test build #75822 has finished for PR 17568 at commit
|
@@ -262,5 +265,4 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { | |||
import packageobject._ | |||
checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1)) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert these changes?
import org.apache.spark.sql.test.SharedSQLContext | ||
import org.apache.spark.sql.types._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert these changes?
@@ -40,7 +40,7 @@ object CatalystTypeConverters { | |||
// Since the map values can be mutable, we explicitly import scala.collection.Map at here. | |||
import scala.collection.Map | |||
|
|||
private def isPrimitive(dataType: DataType): Boolean = { | |||
def isPrimitive(dataType: DataType): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert this change?
@@ -437,11 +437,12 @@ object MapObjects { | |||
function: Expression => Expression, | |||
inputData: Expression, | |||
elementType: DataType, | |||
elementNullable: Boolean = true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @param
?
*/ | ||
object EliminateMapObjects extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case _ @ DeserializeToObject(_ @ Invoke( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: case DeserializeToObject(Invoke(
case _ @ DeserializeToObject(_ @ Invoke( | ||
MapObjects(_, _, _, Cast(LambdaVariable(_, _, dataType, _), castDataType, _), | ||
inputData, None, _), | ||
funcName, returnType @ ObjectType(_), arguments, propagateNull, returnNullable), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: returnType: ObjectType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I addressed all of your comments.
Test build #75825 has finished for PR 17568 at commit
|
@@ -368,6 +369,8 @@ case class NullPropagation(conf: SQLConf) extends Rule[LogicalPlan] { | |||
case EqualNullSafe(Literal(null, _), r) => IsNull(r) | |||
case EqualNullSafe(l, Literal(null, _)) => IsNull(l) | |||
|
|||
case _ @ AssertNotNull(c, _) if !c.nullable => c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case AssertNotNull(c, _) if !c.nullable => c
object EliminateMapObjects extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case _ @ DeserializeToObject(Invoke( | ||
MapObjects(_, _, _, Cast(LambdaVariable(_, _, dataType, _), castDataType, _), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Cast
should be eliminated when running this rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I eliminate Cast
in this rule, too. This Cast
seems to be added very recently by a recent commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, do you mean
Cast
should be eliminated by this rule?; orCast
should be eliminated before applying this rule?
@@ -20,6 +20,8 @@ package org.apache.spark.sql | |||
import scala.collection.immutable.Queue | |||
import scala.collection.mutable.ArrayBuffer | |||
|
|||
import org.apache.spark.sql.catalyst.expressions.objects.Invoke | |||
import org.apache.spark.sql.execution.DeserializeToObjectExec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert the above two lines?
MapObjects(_, _, _, Cast(LambdaVariable(_, _, dataType, _), castDataType, _), | ||
inputData, None), | ||
funcName, returnType: ObjectType, arguments, propagateNull, returnNullable), | ||
outputObjAttr, child) if dataType == castDataType => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove this case. Cast
has been removed by SimplifyCasts
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, as you pointed out, Cast
has been removed by SimplifyCasts
.
I leave this for robustness. In the future, this optimization will be executed before SimplifyCasts
by reordering.
What do you think? cc: @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order does not matter. The batch will be run multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
inputData, funcName, returnType, arguments, propagateNull, returnNullable), | ||
outputObjAttr, child) | ||
case _ @ DeserializeToObject(Invoke( | ||
MapObjects(_, _, _, LambdaVariable(_, _, dataType, _), inputData, None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dataType
is not being used. Basically, this rule is to get rid of MapObjects
when no function is applied to LambdaVariable
. Do we still need to check whether customCollectionCls
is equal to None
? Any other scenario besides type casting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @cloud-fan pointed out in this comment , it is necessary. customCollectionCls
is introduced by #16541.
This is not equal to None
when Seq()
is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, for safety, we can keep it.
DeserializeToObject(Invoke( | ||
inputData, funcName, returnType, arguments, propagateNull, returnNullable), | ||
outputObjAttr, child) | ||
case _ @ DeserializeToObject(Invoke( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: _ @ DeserializeToObject
-> DeserializeToObject
@@ -368,6 +369,8 @@ case class NullPropagation(conf: SQLConf) extends Rule[LogicalPlan] { | |||
case EqualNullSafe(Literal(null, _), r) => IsNull(r) | |||
case EqualNullSafe(l, Literal(null, _)) => IsNull(l) | |||
|
|||
case AssertNotNull(c, _) if !c.nullable => c |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this safe to do? According to the description of AssertNotNull
, even c
is non-nullable, we still need to add this assertion for some cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this is what @cloud-fan suggested in his comment.
Is there better alternative implementation to remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if @cloud-fan's no-op AssertNotNull
is as the same as the case in AssertNotNull
's description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch! sorry it was my mistake, but then seems we can not remove MapObjects
, as the null check have to be done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, I checked all the usage of AssertNotNull
, we never use AssertNotNull
to check a not nullable column/field, seems the document of AssertNotNull
is wrong. Can you double check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @cloud-fan. I have also checked the usages of AssertNotNull
. IIUC, all of them are used for throwing a runtime exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the purpose of AssertNotNull
is used to give proper exception in runtime when an expression (note: it can be nullable or non-nullable expression) evaluates to null value.
Maybe for MapObjects
, we can safely remove it. But I am not sure other cases it is okay too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as long as we trust the nullable
property, I think it's safe to remove it. We don't use AssertNull
to validate the input data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Sounds reasonable to me.
* 1. Mapobject(e) where e is lambdavariable(), which means types for input output | ||
* are primitive types | ||
* 2. no custom collection class specified | ||
* representation of data item. For example back to back map operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment broken?
Test build #75850 has finished for PR 17568 at commit
|
object EliminateMapObjects extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
case DeserializeToObject(Invoke( | ||
MapObjects(_, _, _, _ : LambdaVariable, inputData, None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just replace MapObjects
with its child? Seems the only reason you match the whole DeserializeToObject
is to make sure the returnType
is object type, but that's guaranteed if the collectionClass
is None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To replace MapObjects
with its child is a type LogicalPlan => Expression
while this method requires LogicalPlan => LogicalPlan
.
Is it fine to replace Invoke(MapObject(..., inputData, None)...)
with Invoke(inputData, ...)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, I misunderstood. Both Expression
and Invoke
are not LogicalPlan
.
I think that we have to replace some of arguments in DeserializeToObject
.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Unfortunately, this change caused two test failures.
"checkAnswer should compare map correctly"
and "SPARK-18717: code generation works for both scala.collection.Map and scala.collection.imutable.Map"
in DatasetSuite
I will check what's happen very soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we run the following code, the following exception occurs. This is because UnsafeArrayData.copy()
, which is unsupported, is called.
val ds = Seq((1, Map(2 -> 3))).toDS.map(t => t)
ds.collect.toSeq
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The solution is to use the following matching:
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case MapObjects(_, _, _, LambdaVariable(_, _, _, false), inputData, None) => inputData
}
Previous one avoided the case such as Object[] -> Integer[]
. Thus, the generated code incorrectly called UnsafeArrayData.array()
.
* representation of data item. For example back to back map operations. | ||
*/ | ||
object EliminateMapObjects extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = plan transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we call plan.transformAllExpressions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, it works. done
Test build #75892 has finished for PR 17568 at commit
|
@cloud-fan, would it be possible to review this again? |
DeserializeToObject(Invoke( | ||
inputData, funcName, returnType, arguments, propagateNull, returnNullable), | ||
outputObjAttr, child) | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry for my mistake. Removed.
LGTM |
|
||
/** | ||
* Removes MapObjects when the following conditions are satisfied | ||
* 1. Mapobject(e) where e is lambdavariable(), which means types for input output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is obscure. Can we improve it a bit?
For example, MapObject(e)
is confusing. Shall we clearly say the lambdaFunction of MapObject
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, done
* 1. Mapobject(e) where e is lambdavariable(), which means types for input output | ||
* are primitive types | ||
* 2. no custom collection class specified | ||
* representation of data item. For example back to back map operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rephrase this comment too? It looks weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, deleted
LGTM except for minor comments to the code comments. |
Test build #75895 has finished for PR 17568 at commit
|
Test build #75899 has finished for PR 17568 at commit
|
thanks, merging to master/2.2! |
…h primitive array ## What changes were proposed in this pull request? This PR elminates unnecessary data conversion, which is introduced by SPARK-19716, for Dataset with primitve array in the generated Java code. When we run the following example program, now we get the Java code "Without this PR". In this code, lines 56-82 are unnecessary since the primitive array in ArrayData can be converted into Java primitive array by using ``toDoubleArray()`` method. ``GenericArrayData`` is not required. ```java val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache ds.count ds.map(e => e).show ``` Without this PR ``` == Parsed Logical Plan == 'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D +- 'DeserializeToObject unresolveddeserializer(unresolvedmapobjects(<function1>, getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), obj#23: [D +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- ExternalRDD [obj#1] == Analyzed Logical Plan == value: array<double> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D +- DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- ExternalRDD [obj#1] == Optimized Logical Plan == SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D +- DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- Scan ExternalRDDScan[obj#1] == Physical Plan == *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- *MapElements <function1>, obj#24: [D +- *DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D +- InMemoryTableScan [value#2] +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- Scan ExternalRDDScan[obj#1] ``` ```java /* 050 */ protected void processNext() throws java.io.IOException { /* 051 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 052 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 053 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 054 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0)); /* 055 */ /* 056 */ ArrayData deserializetoobject_value1 = null; /* 057 */ /* 058 */ if (!inputadapter_isNull) { /* 059 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 060 */ /* 061 */ Double[] deserializetoobject_convertedArray = null; /* 062 */ deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength]; /* 063 */ /* 064 */ int deserializetoobject_loopIndex = 0; /* 065 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 066 */ MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex)); /* 067 */ MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 068 */ /* 069 */ if (MapObjects_loopIsNull2) { /* 070 */ throw new RuntimeException(((java.lang.String) references[0])); /* 071 */ } /* 072 */ if (false) { /* 073 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 074 */ } else { /* 075 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2; /* 076 */ } /* 077 */ /* 078 */ deserializetoobject_loopIndex += 1; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/ /* 082 */ } /* 083 */ boolean deserializetoobject_isNull = true; /* 084 */ double[] deserializetoobject_value = null; /* 085 */ if (!inputadapter_isNull) { /* 086 */ deserializetoobject_isNull = false; /* 087 */ if (!deserializetoobject_isNull) { /* 088 */ Object deserializetoobject_funcResult = null; /* 089 */ deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray(); /* 090 */ if (deserializetoobject_funcResult == null) { /* 091 */ deserializetoobject_isNull = true; /* 092 */ } else { /* 093 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult; /* 094 */ } /* 095 */ /* 096 */ } /* 097 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 098 */ } /* 099 */ /* 100 */ boolean mapelements_isNull = true; /* 101 */ double[] mapelements_value = null; /* 102 */ if (!false) { /* 103 */ mapelements_resultIsNull = false; /* 104 */ /* 105 */ if (!mapelements_resultIsNull) { /* 106 */ mapelements_resultIsNull = deserializetoobject_isNull; /* 107 */ mapelements_argValue = deserializetoobject_value; /* 108 */ } /* 109 */ /* 110 */ mapelements_isNull = mapelements_resultIsNull; /* 111 */ if (!mapelements_isNull) { /* 112 */ Object mapelements_funcResult = null; /* 113 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 114 */ if (mapelements_funcResult == null) { /* 115 */ mapelements_isNull = true; /* 116 */ } else { /* 117 */ mapelements_value = (double[]) mapelements_funcResult; /* 118 */ } /* 119 */ /* 120 */ } /* 121 */ mapelements_isNull = mapelements_value == null; /* 122 */ } /* 123 */ /* 124 */ serializefromobject_resultIsNull = false; /* 125 */ /* 126 */ if (!serializefromobject_resultIsNull) { /* 127 */ serializefromobject_resultIsNull = mapelements_isNull; /* 128 */ serializefromobject_argValue = mapelements_value; /* 129 */ } /* 130 */ /* 131 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull; /* 132 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue); /* 133 */ serializefromobject_isNull = serializefromobject_value == null; /* 134 */ serializefromobject_holder.reset(); /* 135 */ /* 136 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 137 */ /* 138 */ if (serializefromobject_isNull) { /* 139 */ serializefromobject_rowWriter.setNullAt(0); /* 140 */ } else { /* 141 */ // Remember the current cursor so that we can calculate how many bytes are /* 142 */ // written later. /* 143 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 144 */ /* 145 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 146 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 147 */ // grow the global buffer before writing data. /* 148 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 149 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 150 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 151 */ /* 152 */ } else { /* 153 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 154 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8); /* 155 */ /* 156 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 157 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 158 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index); /* 159 */ } else { /* 160 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index); /* 161 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 162 */ } /* 163 */ } /* 164 */ } /* 165 */ /* 166 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 167 */ } /* 168 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 169 */ append(serializefromobject_result); /* 170 */ if (shouldStop()) return; /* 171 */ } /* 172 */ } ``` With this PR (eliminated lines 56-62 in the above code) ```java /* 047 */ protected void processNext() throws java.io.IOException { /* 048 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 049 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 050 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 051 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0)); /* 052 */ /* 053 */ boolean deserializetoobject_isNull = true; /* 054 */ double[] deserializetoobject_value = null; /* 055 */ if (!inputadapter_isNull) { /* 056 */ deserializetoobject_isNull = false; /* 057 */ if (!deserializetoobject_isNull) { /* 058 */ Object deserializetoobject_funcResult = null; /* 059 */ deserializetoobject_funcResult = inputadapter_value.toDoubleArray(); /* 060 */ if (deserializetoobject_funcResult == null) { /* 061 */ deserializetoobject_isNull = true; /* 062 */ } else { /* 063 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult; /* 064 */ } /* 065 */ /* 066 */ } /* 067 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 068 */ } /* 069 */ /* 070 */ boolean mapelements_isNull = true; /* 071 */ double[] mapelements_value = null; /* 072 */ if (!false) { /* 073 */ mapelements_resultIsNull = false; /* 074 */ /* 075 */ if (!mapelements_resultIsNull) { /* 076 */ mapelements_resultIsNull = deserializetoobject_isNull; /* 077 */ mapelements_argValue = deserializetoobject_value; /* 078 */ } /* 079 */ /* 080 */ mapelements_isNull = mapelements_resultIsNull; /* 081 */ if (!mapelements_isNull) { /* 082 */ Object mapelements_funcResult = null; /* 083 */ mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue); /* 084 */ if (mapelements_funcResult == null) { /* 085 */ mapelements_isNull = true; /* 086 */ } else { /* 087 */ mapelements_value = (double[]) mapelements_funcResult; /* 088 */ } /* 089 */ /* 090 */ } /* 091 */ mapelements_isNull = mapelements_value == null; /* 092 */ } /* 093 */ /* 094 */ serializefromobject_resultIsNull = false; /* 095 */ /* 096 */ if (!serializefromobject_resultIsNull) { /* 097 */ serializefromobject_resultIsNull = mapelements_isNull; /* 098 */ serializefromobject_argValue = mapelements_value; /* 099 */ } /* 100 */ /* 101 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull; /* 102 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue); /* 103 */ serializefromobject_isNull = serializefromobject_value == null; /* 104 */ serializefromobject_holder.reset(); /* 105 */ /* 106 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 107 */ /* 108 */ if (serializefromobject_isNull) { /* 109 */ serializefromobject_rowWriter.setNullAt(0); /* 110 */ } else { /* 111 */ // Remember the current cursor so that we can calculate how many bytes are /* 112 */ // written later. /* 113 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 114 */ /* 115 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 116 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 117 */ // grow the global buffer before writing data. /* 118 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 119 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 120 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 121 */ /* 122 */ } else { /* 123 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 124 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8); /* 125 */ /* 126 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 127 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 128 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index); /* 129 */ } else { /* 130 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index); /* 131 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 132 */ } /* 133 */ } /* 134 */ } /* 135 */ /* 136 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 137 */ } /* 138 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 139 */ append(serializefromobject_result); /* 140 */ if (shouldStop()) return; /* 141 */ } /* 142 */ } ``` ## How was this patch tested? Add test suites into `DatasetPrimitiveSuite` Author: Kazuaki Ishizaki <[email protected]> Closes #17568 from kiszk/SPARK-20254. (cherry picked from commit e468a96) Signed-off-by: Wenchen Fan <[email protected]>
Thank you very much!! |
…h primitive array ## What changes were proposed in this pull request? This PR elminates unnecessary data conversion, which is introduced by SPARK-19716, for Dataset with primitve array in the generated Java code. When we run the following example program, now we get the Java code "Without this PR". In this code, lines 56-82 are unnecessary since the primitive array in ArrayData can be converted into Java primitive array by using ``toDoubleArray()`` method. ``GenericArrayData`` is not required. ```java val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache ds.count ds.map(e => e).show ``` Without this PR ``` == Parsed Logical Plan == 'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D +- 'DeserializeToObject unresolveddeserializer(unresolvedmapobjects(<function1>, getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), obj#23: [D +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- ExternalRDD [obj#1] == Analyzed Logical Plan == value: array<double> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D +- DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- ExternalRDD [obj#1] == Optimized Logical Plan == SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D +- DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- Scan ExternalRDDScan[obj#1] == Physical Plan == *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25] +- *MapElements <function1>, obj#24: [D +- *DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D +- InMemoryTableScan [value#2] +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2] +- Scan ExternalRDDScan[obj#1] ``` ```java /* 050 */ protected void processNext() throws java.io.IOException { /* 051 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 052 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 053 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 054 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0)); /* 055 */ /* 056 */ ArrayData deserializetoobject_value1 = null; /* 057 */ /* 058 */ if (!inputadapter_isNull) { /* 059 */ int deserializetoobject_dataLength = inputadapter_value.numElements(); /* 060 */ /* 061 */ Double[] deserializetoobject_convertedArray = null; /* 062 */ deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength]; /* 063 */ /* 064 */ int deserializetoobject_loopIndex = 0; /* 065 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 066 */ MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex)); /* 067 */ MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex); /* 068 */ /* 069 */ if (MapObjects_loopIsNull2) { /* 070 */ throw new RuntimeException(((java.lang.String) references[0])); /* 071 */ } /* 072 */ if (false) { /* 073 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null; /* 074 */ } else { /* 075 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2; /* 076 */ } /* 077 */ /* 078 */ deserializetoobject_loopIndex += 1; /* 079 */ } /* 080 */ /* 081 */ deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/ /* 082 */ } /* 083 */ boolean deserializetoobject_isNull = true; /* 084 */ double[] deserializetoobject_value = null; /* 085 */ if (!inputadapter_isNull) { /* 086 */ deserializetoobject_isNull = false; /* 087 */ if (!deserializetoobject_isNull) { /* 088 */ Object deserializetoobject_funcResult = null; /* 089 */ deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray(); /* 090 */ if (deserializetoobject_funcResult == null) { /* 091 */ deserializetoobject_isNull = true; /* 092 */ } else { /* 093 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult; /* 094 */ } /* 095 */ /* 096 */ } /* 097 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 098 */ } /* 099 */ /* 100 */ boolean mapelements_isNull = true; /* 101 */ double[] mapelements_value = null; /* 102 */ if (!false) { /* 103 */ mapelements_resultIsNull = false; /* 104 */ /* 105 */ if (!mapelements_resultIsNull) { /* 106 */ mapelements_resultIsNull = deserializetoobject_isNull; /* 107 */ mapelements_argValue = deserializetoobject_value; /* 108 */ } /* 109 */ /* 110 */ mapelements_isNull = mapelements_resultIsNull; /* 111 */ if (!mapelements_isNull) { /* 112 */ Object mapelements_funcResult = null; /* 113 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue); /* 114 */ if (mapelements_funcResult == null) { /* 115 */ mapelements_isNull = true; /* 116 */ } else { /* 117 */ mapelements_value = (double[]) mapelements_funcResult; /* 118 */ } /* 119 */ /* 120 */ } /* 121 */ mapelements_isNull = mapelements_value == null; /* 122 */ } /* 123 */ /* 124 */ serializefromobject_resultIsNull = false; /* 125 */ /* 126 */ if (!serializefromobject_resultIsNull) { /* 127 */ serializefromobject_resultIsNull = mapelements_isNull; /* 128 */ serializefromobject_argValue = mapelements_value; /* 129 */ } /* 130 */ /* 131 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull; /* 132 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue); /* 133 */ serializefromobject_isNull = serializefromobject_value == null; /* 134 */ serializefromobject_holder.reset(); /* 135 */ /* 136 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 137 */ /* 138 */ if (serializefromobject_isNull) { /* 139 */ serializefromobject_rowWriter.setNullAt(0); /* 140 */ } else { /* 141 */ // Remember the current cursor so that we can calculate how many bytes are /* 142 */ // written later. /* 143 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 144 */ /* 145 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 146 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 147 */ // grow the global buffer before writing data. /* 148 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 149 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 150 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 151 */ /* 152 */ } else { /* 153 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 154 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8); /* 155 */ /* 156 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 157 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 158 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index); /* 159 */ } else { /* 160 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index); /* 161 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 162 */ } /* 163 */ } /* 164 */ } /* 165 */ /* 166 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 167 */ } /* 168 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 169 */ append(serializefromobject_result); /* 170 */ if (shouldStop()) return; /* 171 */ } /* 172 */ } ``` With this PR (eliminated lines 56-62 in the above code) ```java /* 047 */ protected void processNext() throws java.io.IOException { /* 048 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 049 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 050 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 051 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0)); /* 052 */ /* 053 */ boolean deserializetoobject_isNull = true; /* 054 */ double[] deserializetoobject_value = null; /* 055 */ if (!inputadapter_isNull) { /* 056 */ deserializetoobject_isNull = false; /* 057 */ if (!deserializetoobject_isNull) { /* 058 */ Object deserializetoobject_funcResult = null; /* 059 */ deserializetoobject_funcResult = inputadapter_value.toDoubleArray(); /* 060 */ if (deserializetoobject_funcResult == null) { /* 061 */ deserializetoobject_isNull = true; /* 062 */ } else { /* 063 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult; /* 064 */ } /* 065 */ /* 066 */ } /* 067 */ deserializetoobject_isNull = deserializetoobject_value == null; /* 068 */ } /* 069 */ /* 070 */ boolean mapelements_isNull = true; /* 071 */ double[] mapelements_value = null; /* 072 */ if (!false) { /* 073 */ mapelements_resultIsNull = false; /* 074 */ /* 075 */ if (!mapelements_resultIsNull) { /* 076 */ mapelements_resultIsNull = deserializetoobject_isNull; /* 077 */ mapelements_argValue = deserializetoobject_value; /* 078 */ } /* 079 */ /* 080 */ mapelements_isNull = mapelements_resultIsNull; /* 081 */ if (!mapelements_isNull) { /* 082 */ Object mapelements_funcResult = null; /* 083 */ mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue); /* 084 */ if (mapelements_funcResult == null) { /* 085 */ mapelements_isNull = true; /* 086 */ } else { /* 087 */ mapelements_value = (double[]) mapelements_funcResult; /* 088 */ } /* 089 */ /* 090 */ } /* 091 */ mapelements_isNull = mapelements_value == null; /* 092 */ } /* 093 */ /* 094 */ serializefromobject_resultIsNull = false; /* 095 */ /* 096 */ if (!serializefromobject_resultIsNull) { /* 097 */ serializefromobject_resultIsNull = mapelements_isNull; /* 098 */ serializefromobject_argValue = mapelements_value; /* 099 */ } /* 100 */ /* 101 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull; /* 102 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue); /* 103 */ serializefromobject_isNull = serializefromobject_value == null; /* 104 */ serializefromobject_holder.reset(); /* 105 */ /* 106 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 107 */ /* 108 */ if (serializefromobject_isNull) { /* 109 */ serializefromobject_rowWriter.setNullAt(0); /* 110 */ } else { /* 111 */ // Remember the current cursor so that we can calculate how many bytes are /* 112 */ // written later. /* 113 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 114 */ /* 115 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 116 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 117 */ // grow the global buffer before writing data. /* 118 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 119 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 120 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 121 */ /* 122 */ } else { /* 123 */ final int serializefromobject_numElements = serializefromobject_value.numElements(); /* 124 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8); /* 125 */ /* 126 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) { /* 127 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) { /* 128 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index); /* 129 */ } else { /* 130 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index); /* 131 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element); /* 132 */ } /* 133 */ } /* 134 */ } /* 135 */ /* 136 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 137 */ } /* 138 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 139 */ append(serializefromobject_result); /* 140 */ if (shouldStop()) return; /* 141 */ } /* 142 */ } ``` ## How was this patch tested? Add test suites into `DatasetPrimitiveSuite` Author: Kazuaki Ishizaki <[email protected]> Closes apache#17568 from kiszk/SPARK-20254.
What changes were proposed in this pull request?
This PR elminates unnecessary data conversion, which is introduced by SPARK-19716, for Dataset with primitve array in the generated Java code.
When we run the following example program, now we get the Java code "Without this PR". In this code, lines 56-82 are unnecessary since the primitive array in ArrayData can be converted into Java primitive array by using
toDoubleArray()
method.GenericArrayData
is not required.Without this PR
With this PR (eliminated lines 56-62 in the above code)
How was this patch tested?
Add test suites into
DatasetPrimitiveSuite