-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24305][SQL][FOLLOWUP] Avoid serialization of private fields in collection expressions. #21352
[SPARK-24305][SQL][FOLLOWUP] Avoid serialization of private fields in collection expressions. #21352
Changes from 2 commits
ded67f5
e96962e
f6368b5
2862d3e
a4d1e7f
62c55ad
294ac69
94b86a2
a0abc25
872ef99
fd3a945
922d2f0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -388,7 +388,8 @@ case class Reverse(child: Expression) extends UnaryExpression with ImplicitCastI | |
|
||
override def dataType: DataType = child.dataType | ||
|
||
lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType | ||
@transient | ||
private lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType | ||
|
||
override def nullSafeEval(input: Any): Any = input match { | ||
case a: ArrayData => new GenericArrayData(a.toObjectArray(elementType).reverse) | ||
|
@@ -552,7 +553,8 @@ case class Slice(x: Expression, start: Expression, length: Expression) | |
|
||
override def children: Seq[Expression] = Seq(x, start, length) | ||
|
||
lazy val elementType: DataType = x.dataType.asInstanceOf[ArrayType].elementType | ||
@transient | ||
private lazy val elementType: DataType = x.dataType.asInstanceOf[ArrayType].elementType | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as before :) |
||
|
||
override def nullSafeEval(xVal: Any, startVal: Any, lengthVal: Any): Any = { | ||
val startInt = startVal.asInstanceOf[Int] | ||
|
@@ -837,6 +839,7 @@ case class ArrayMin(child: Expression) extends UnaryExpression with ImplicitCast | |
|
||
override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) | ||
|
||
@transient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, here it isn't necessary. |
||
private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType) | ||
|
||
override def checkInputDataTypes(): TypeCheckResult = { | ||
|
@@ -902,6 +905,7 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast | |
|
||
override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) | ||
|
||
@transient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType) | ||
|
||
override def checkInputDataTypes(): TypeCheckResult = { | ||
|
@@ -1126,9 +1130,9 @@ case class ElementAt(left: Expression, right: Expression) extends GetMapValueUti | |
""") | ||
case class Concat(children: Seq[Expression]) extends Expression { | ||
|
||
private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH | ||
private def maxArrayLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since |
||
|
||
val allowedTypes = Seq(StringType, BinaryType, ArrayType) | ||
private def allowedTypes: Seq[AbstractDataType] = Seq(StringType, BinaryType, ArrayType) | ||
|
||
override def checkInputDataTypes(): TypeCheckResult = { | ||
if (children.isEmpty) { | ||
|
@@ -1147,6 +1151,7 @@ case class Concat(children: Seq[Expression]) extends Expression { | |
|
||
override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType) | ||
|
||
@transient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doGenCode There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for all this related to code generation, please refer to #21352 (comment) |
||
lazy val javaType: String = CodeGenerator.javaType(dataType) | ||
|
||
override def nullable: Boolean = children.exists(_.nullable) | ||
|
@@ -1167,9 +1172,9 @@ case class Concat(children: Seq[Expression]) extends Expression { | |
} else { | ||
val arrayData = inputs.map(_.asInstanceOf[ArrayData]) | ||
val numberOfElements = arrayData.foldLeft(0L)((sum, ad) => sum + ad.numElements()) | ||
if (numberOfElements > MAX_ARRAY_LENGTH) { | ||
if (numberOfElements > maxArrayLength) { | ||
throw new RuntimeException(s"Unsuccessful try to concat arrays with $numberOfElements" + | ||
s" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.") | ||
s" elements due to exceeding the array size limit $maxArrayLength.") | ||
} | ||
val finalData = new Array[AnyRef](numberOfElements.toInt) | ||
var position = 0 | ||
|
@@ -1227,9 +1232,9 @@ case class Concat(children: Seq[Expression]) extends Expression { | |
|for (int z = 0; z < ${children.length}; z++) { | ||
| $numElements += args[z].numElements(); | ||
|} | ||
|if ($numElements > $MAX_ARRAY_LENGTH) { | ||
|if ($numElements > $maxArrayLength) { | ||
| throw new RuntimeException("Unsuccessful try to concat arrays with " + $numElements + | ||
| " elements due to exceeding the array size limit $MAX_ARRAY_LENGTH."); | ||
| " elements due to exceeding the array size limit $maxArrayLength."); | ||
|} | ||
""".stripMargin | ||
|
||
|
@@ -1324,15 +1329,17 @@ case class Concat(children: Seq[Expression]) extends Expression { | |
since = "2.4.0") | ||
case class Flatten(child: Expression) extends UnaryExpression { | ||
|
||
private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH | ||
private def maxArrayLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
|
||
@transient | ||
private lazy val childDataType: ArrayType = child.dataType.asInstanceOf[ArrayType] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should only use |
||
|
||
override def nullable: Boolean = child.nullable || childDataType.containsNull | ||
|
||
override def dataType: DataType = childDataType.elementType | ||
|
||
lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType | ||
@transient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doGenCode |
||
private lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType | ||
|
||
override def checkInputDataTypes(): TypeCheckResult = child.dataType match { | ||
case ArrayType(_: ArrayType, _) => | ||
|
@@ -1352,9 +1359,9 @@ case class Flatten(child: Expression) extends UnaryExpression { | |
} else { | ||
val arrayData = elements.map(_.asInstanceOf[ArrayData]) | ||
val numberOfElements = arrayData.foldLeft(0L)((sum, e) => sum + e.numElements()) | ||
if (numberOfElements > MAX_ARRAY_LENGTH) { | ||
if (numberOfElements > maxArrayLength) { | ||
throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " + | ||
s"$numberOfElements elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.") | ||
s"$numberOfElements elements due to exceeding the array size limit $maxArrayLength.") | ||
} | ||
val flattenedData = new Array(numberOfElements.toInt) | ||
var position = 0 | ||
|
@@ -1401,9 +1408,9 @@ case class Flatten(child: Expression) extends UnaryExpression { | |
|for (int z = 0; z < $childVariableName.numElements(); z++) { | ||
| $variableName += $childVariableName.getArray(z).numElements(); | ||
|} | ||
|if ($variableName > $MAX_ARRAY_LENGTH) { | ||
|if ($variableName > $maxArrayLength) { | ||
| throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " + | ||
| $variableName + " elements due to exceeding the array size limit $MAX_ARRAY_LENGTH."); | ||
| $variableName + " elements due to exceeding the array size limit $maxArrayLength."); | ||
|} | ||
""".stripMargin | ||
(code, variableName) | ||
|
@@ -1483,7 +1490,7 @@ case class Flatten(child: Expression) extends UnaryExpression { | |
case class ArrayRepeat(left: Expression, right: Expression) | ||
extends BinaryExpression with ExpectsInputTypes { | ||
|
||
private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH | ||
private def maxArrayLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
|
||
override def dataType: ArrayType = ArrayType(left.dataType, left.nullable) | ||
|
||
|
@@ -1496,9 +1503,9 @@ case class ArrayRepeat(left: Expression, right: Expression) | |
if (count == null) { | ||
null | ||
} else { | ||
if (count.asInstanceOf[Int] > MAX_ARRAY_LENGTH) { | ||
if (count.asInstanceOf[Int] > maxArrayLength) { | ||
throw new RuntimeException(s"Unsuccessful try to create array with $count elements " + | ||
s"due to exceeding the array size limit $MAX_ARRAY_LENGTH."); | ||
s"due to exceeding the array size limit $maxArrayLength."); | ||
} | ||
val element = left.eval(input) | ||
new GenericArrayData(Array.fill(count.asInstanceOf[Int])(element)) | ||
|
@@ -1557,9 +1564,9 @@ case class ArrayRepeat(left: Expression, right: Expression) | |
|if ($count > 0) { | ||
| $numElements = $count; | ||
|} | ||
|if ($numElements > $MAX_ARRAY_LENGTH) { | ||
|if ($numElements > $maxArrayLength) { | ||
| throw new RuntimeException("Unsuccessful try to create array with " + $numElements + | ||
| " elements due to exceeding the array size limit $MAX_ARRAY_LENGTH."); | ||
| " elements due to exceeding the array size limit $maxArrayLength."); | ||
|} | ||
""".stripMargin | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not serialized since it is lazy and it is used only in the eval methods. I think we don't need this change, so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the
arrayCodeGen
method, isn't it executed before seriealization?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code generation is executed on each partition so I'd say no, it is not executed before serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the benefit of adding
@transient
is: we don't need to think about if this method is called on executor side only. We never serialize it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgaido91 Thanks for clarification! I thought that code gets generated and compiled on the driver and sent to each executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, np. Anyway, if you all think that it is good to add it, then let's do it and ignore my comments related to
@transient
please.