Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24305][SQL][FOLLOWUP] Avoid serialization of private fields in collection expressions. #21352

Closed
wants to merge 12 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ case class Reverse(child: Expression) extends UnaryExpression with ImplicitCastI

override def dataType: DataType = child.dataType

lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType
@transient
private lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType
Copy link
Contributor

@mgaido91 mgaido91 May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not serialized since it is lazy and it is used only in the eval methods. I think we don't need this change, so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the arrayCodeGen method, isn't it executed before seriealization?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code generation is executed on each partition so I'd say no, it is not executed before serialization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the benefit of adding @transient is: we don't need to think about if this method is called on executor side only. We never serialize it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Thanks for clarification! I thought that code gets generated and compiled on the driver and sent to each executor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, np. Anyway, if you all think that it is good to add it, then let's do it and ignore my comments related to @transient please.


override def nullSafeEval(input: Any): Any = input match {
case a: ArrayData => new GenericArrayData(a.toObjectArray(elementType).reverse)
Expand Down Expand Up @@ -552,7 +553,8 @@ case class Slice(x: Expression, start: Expression, length: Expression)

override def children: Seq[Expression] = Seq(x, start, length)

lazy val elementType: DataType = x.dataType.asInstanceOf[ArrayType].elementType
@transient
private lazy val elementType: DataType = x.dataType.asInstanceOf[ArrayType].elementType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about genCodeForResult?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as before :)


override def nullSafeEval(xVal: Any, startVal: Any, lengthVal: Any): Any = {
val startInt = startVal.asInstanceOf[Int]
Expand Down Expand Up @@ -837,6 +839,7 @@ case class ArrayMin(child: Expression) extends UnaryExpression with ImplicitCast

override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)

@transient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, here it isn't necessary.

private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)

override def checkInputDataTypes(): TypeCheckResult = {
Expand Down Expand Up @@ -902,6 +905,7 @@ case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCast

override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)

@transient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)

override def checkInputDataTypes(): TypeCheckResult = {
Expand Down Expand Up @@ -1126,9 +1130,9 @@ case class ElementAt(left: Expression, right: Expression) extends GetMapValueUti
""")
case class Concat(children: Seq[Expression]) extends Expression {

private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
private def maxArrayLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH is a static value, what about using directly it in the methods instead of defining a quite useless def?


val allowedTypes = Seq(StringType, BinaryType, ArrayType)
private def allowedTypes: Seq[AbstractDataType] = Seq(StringType, BinaryType, ArrayType)

override def checkInputDataTypes(): TypeCheckResult = {
if (children.isEmpty) {
Expand All @@ -1147,6 +1151,7 @@ case class Concat(children: Seq[Expression]) extends Expression {

override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType)

@transient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doGenCode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for all this related to code generation, please refer to #21352 (comment)

lazy val javaType: String = CodeGenerator.javaType(dataType)

override def nullable: Boolean = children.exists(_.nullable)
Expand All @@ -1167,9 +1172,9 @@ case class Concat(children: Seq[Expression]) extends Expression {
} else {
val arrayData = inputs.map(_.asInstanceOf[ArrayData])
val numberOfElements = arrayData.foldLeft(0L)((sum, ad) => sum + ad.numElements())
if (numberOfElements > MAX_ARRAY_LENGTH) {
if (numberOfElements > maxArrayLength) {
throw new RuntimeException(s"Unsuccessful try to concat arrays with $numberOfElements" +
s" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.")
s" elements due to exceeding the array size limit $maxArrayLength.")
}
val finalData = new Array[AnyRef](numberOfElements.toInt)
var position = 0
Expand Down Expand Up @@ -1227,9 +1232,9 @@ case class Concat(children: Seq[Expression]) extends Expression {
|for (int z = 0; z < ${children.length}; z++) {
| $numElements += args[z].numElements();
|}
|if ($numElements > $MAX_ARRAY_LENGTH) {
|if ($numElements > $maxArrayLength) {
| throw new RuntimeException("Unsuccessful try to concat arrays with " + $numElements +
| " elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.");
| " elements due to exceeding the array size limit $maxArrayLength.");
|}
""".stripMargin

Expand Down Expand Up @@ -1324,15 +1329,17 @@ case class Concat(children: Seq[Expression]) extends Expression {
since = "2.4.0")
case class Flatten(child: Expression) extends UnaryExpression {

private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
private def maxArrayLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


@transient
private lazy val childDataType: ArrayType = child.dataType.asInstanceOf[ArrayType]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should only use lazy val if it can save some heavy computation. For things like this, I think def is good enough


override def nullable: Boolean = child.nullable || childDataType.containsNull

override def dataType: DataType = childDataType.elementType

lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType
@transient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doGenCode

private lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType

override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
case ArrayType(_: ArrayType, _) =>
Expand All @@ -1352,9 +1359,9 @@ case class Flatten(child: Expression) extends UnaryExpression {
} else {
val arrayData = elements.map(_.asInstanceOf[ArrayData])
val numberOfElements = arrayData.foldLeft(0L)((sum, e) => sum + e.numElements())
if (numberOfElements > MAX_ARRAY_LENGTH) {
if (numberOfElements > maxArrayLength) {
throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
s"$numberOfElements elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.")
s"$numberOfElements elements due to exceeding the array size limit $maxArrayLength.")
}
val flattenedData = new Array(numberOfElements.toInt)
var position = 0
Expand Down Expand Up @@ -1401,9 +1408,9 @@ case class Flatten(child: Expression) extends UnaryExpression {
|for (int z = 0; z < $childVariableName.numElements(); z++) {
| $variableName += $childVariableName.getArray(z).numElements();
|}
|if ($variableName > $MAX_ARRAY_LENGTH) {
|if ($variableName > $maxArrayLength) {
| throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
| $variableName + " elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.");
| $variableName + " elements due to exceeding the array size limit $maxArrayLength.");
|}
""".stripMargin
(code, variableName)
Expand Down Expand Up @@ -1483,7 +1490,7 @@ case class Flatten(child: Expression) extends UnaryExpression {
case class ArrayRepeat(left: Expression, right: Expression)
extends BinaryExpression with ExpectsInputTypes {

private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
private def maxArrayLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


override def dataType: ArrayType = ArrayType(left.dataType, left.nullable)

Expand All @@ -1496,9 +1503,9 @@ case class ArrayRepeat(left: Expression, right: Expression)
if (count == null) {
null
} else {
if (count.asInstanceOf[Int] > MAX_ARRAY_LENGTH) {
if (count.asInstanceOf[Int] > maxArrayLength) {
throw new RuntimeException(s"Unsuccessful try to create array with $count elements " +
s"due to exceeding the array size limit $MAX_ARRAY_LENGTH.");
s"due to exceeding the array size limit $maxArrayLength.");
}
val element = left.eval(input)
new GenericArrayData(Array.fill(count.asInstanceOf[Int])(element))
Expand Down Expand Up @@ -1557,9 +1564,9 @@ case class ArrayRepeat(left: Expression, right: Expression)
|if ($count > 0) {
| $numElements = $count;
|}
|if ($numElements > $MAX_ARRAY_LENGTH) {
|if ($numElements > $maxArrayLength) {
| throw new RuntimeException("Unsuccessful try to create array with " + $numElements +
| " elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.");
| " elements due to exceeding the array size limit $maxArrayLength.");
|}
""".stripMargin

Expand Down