-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24305][SQL][FOLLOWUP] Avoid serialization of private fields in collection expressions. #21352
Conversation
… new collection expressions.
in most of the cases these are lazy fields which are called only on the execution, so I don't think they are serialized. And in some cases (I am thinking to the ordering ones) with this patch we are executing the same computation for each row, which means a performance regression. I am not sure honestly we do need this. @cloud-fan what do you think? |
adding |
@@ -870,6 +870,7 @@ case class ArrayMin(child: Expression) extends UnaryExpression with ImplicitCast | |||
|
|||
override protected def nullSafeEval(input: Any): Any = { | |||
var min: Any = null | |||
val ordering = TypeUtils.getInterpretedOrdering(dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't do this, nullSafeEval
is called for every input record
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will fix it.
@@ -388,7 +388,8 @@ case class Reverse(child: Expression) extends UnaryExpression with ImplicitCastI | |||
|
|||
override def dataType: DataType = child.dataType | |||
|
|||
lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType | |||
@transient | |||
private lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not serialized since it is lazy and it is used only in the eval methods. I think we don't need this change, so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the arrayCodeGen
method, isn't it executed before seriealization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code generation is executed on each partition so I'd say no, it is not executed before serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the benefit of adding @transient
is: we don't need to think about if this method is called on executor side only. We never serialize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgaido91 Thanks for clarification! I thought that code gets generated and compiled on the driver and sent to each executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, np. Anyway, if you all think that it is good to add it, then let's do it and ignore my comments related to @transient
please.
@@ -552,7 +553,8 @@ case class Slice(x: Expression, start: Expression, length: Expression) | |||
|
|||
override def children: Seq[Expression] = Seq(x, start, length) | |||
|
|||
lazy val elementType: DataType = x.dataType.asInstanceOf[ArrayType].elementType | |||
@transient | |||
private lazy val elementType: DataType = x.dataType.asInstanceOf[ArrayType].elementType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about genCodeForResult
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as before :)
@@ -837,6 +839,7 @@ case class ArrayMin(child: Expression) extends UnaryExpression with ImplicitCast | |||
|
|||
override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) | |||
|
|||
@transient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, here it isn't necessary.
@@ -902,6 +905,7 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast | |||
|
|||
override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) | |||
|
|||
@transient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -1147,6 +1151,7 @@ case class Concat(children: Seq[Expression]) extends Expression { | |||
|
|||
override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType) | |||
|
|||
@transient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doGenCode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for all this related to code generation, please refer to #21352 (comment)
@@ -1126,9 +1130,9 @@ case class ElementAt(left: Expression, right: Expression) extends GetMapValueUti | |||
""") | |||
case class Concat(children: Seq[Expression]) extends Expression { | |||
|
|||
private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH | |||
private def maxArrayLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
is a static value, what about using directly it in the methods instead of defining a quite useless def?
@@ -1324,15 +1329,17 @@ case class Concat(children: Seq[Expression]) extends Expression { | |||
since = "2.4.0") | |||
case class Flatten(child: Expression) extends UnaryExpression { | |||
|
|||
private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH | |||
private def maxArrayLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -1483,7 +1490,7 @@ case class Flatten(child: Expression) extends UnaryExpression { | |||
case class ArrayRepeat(left: Expression, right: Expression) | |||
extends BinaryExpression with ExpectsInputTypes { | |||
|
|||
private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH | |||
private def maxArrayLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
private lazy val childDataType: ArrayType = child.dataType.asInstanceOf[ArrayType] | ||
|
||
override def nullable: Boolean = child.nullable || childDataType.containsNull | ||
|
||
override def dataType: DataType = childDataType.elementType | ||
|
||
lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType | ||
@transient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doGenCode
Test build #90730 has finished for PR 21352 at commit
|
Test build #90733 has finished for PR 21352 at commit
|
Test build #90740 has finished for PR 21352 at commit
|
Test build #90747 has finished for PR 21352 at commit
|
Jenkins, retest this please. |
Test build #90767 has finished for PR 21352 at commit
|
retest this please |
Test build #90781 has finished for PR 21352 at commit
|
ok to test |
Test build #93062 has finished for PR 21352 at commit
|
@@ -1324,15 +1329,15 @@ case class Concat(children: Seq[Expression]) extends Expression { | |||
since = "2.4.0") | |||
case class Flatten(child: Expression) extends UnaryExpression { | |||
|
|||
private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH | |||
|
|||
@transient | |||
private lazy val childDataType: ArrayType = child.dataType.asInstanceOf[ArrayType] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should only use lazy val
if it can save some heavy computation. For things like this, I think def
is good enough
…ering more expressions.
case ArrayType( | ||
@transient private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = { | ||
child.dataType match { | ||
case ArrayType( | ||
StructType(Array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the indentation seems wrong here.
LGTM |
@@ -3226,7 +3218,7 @@ case class ArrayDistinct(child: Expression) | |||
|
|||
override def dataType: DataType = child.dataType | |||
|
|||
@transient lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType | |||
private def elementType: DataType = dataType.asInstanceOf[ArrayType].elementType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: as this is used in the eval method, with this PR we are re-evaluating this code for each row. Despite probably it is not a big issue, I'd rather not introduce perf regression. WDYT @cloud-fan ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. If it's used in eval
, let's use lazy val, as it will be called for every input. Are there more places like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgaido91 Thanks for your point!
Test build #93115 has finished for PR 21352 at commit
|
Test build #93116 has finished for PR 21352 at commit
|
@transient lazy val numberOfArrays: Int = children.length | ||
override def nullable: Boolean = children.exists(_.nullable) | ||
|
||
@transient private lazy val arrayElementTypes = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we don't need the braces
@@ -360,7 +356,7 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp | |||
|
|||
override def inputTypes: Seq[AbstractDataType] = Seq(MapType) | |||
|
|||
lazy val childDataType: MapType = child.dataType.asInstanceOf[MapType] | |||
private def childDataType: MapType = child.dataType.asInstanceOf[MapType] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a lazy val
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed that one. Thanks!
StructField(_, valueType, valueNullable, _))), | ||
containsNull) => Some((MapType(keyType, valueType, valueNullable), keyNullable, containsNull)) | ||
case _ => None | ||
@transient private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an unneeded change, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I wanted to be consistent in terms of formatting. (@transient
to be on the same line as private lazy val dataTypeDetails
) After the change, two lines were exceeding 100 characters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but this seems an unneeded change to me and I think there are other places where we use this syntax, so I see no reason to change it
child.dataType match { | ||
case ArrayType( | ||
StructType(Array( | ||
StructField(_, kt, kn, _), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is there any reason to change variable names? It would be good to minimize differences for review and ease of understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the motivation is described here. I will revert this piece of code shortly.
Test build #93165 has finished for PR 21352 at commit
|
LGTM |
thanks, merging to master! |
Test build #93175 has finished for PR 21352 at commit
|
What changes were proposed in this pull request?
The PR tries to avoid serialization of private fields of already added collection functions and follows up on comments in SPARK-23922 and SPARK-23935
How was this patch tested?
Run tests from: