diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 7af42295373b5..ef1d12531f109 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -156,6 +156,12 @@ class JoinedRow extends Row { /** * JIT HACK: Replace with macros + * The `JoinedRow` class is used in many performance critical situation. Unfortunately, since there + * are multiple different types of `Rows` that could be stored as `row1` and `row2` most of the + * calls in the critical path are polymorphic. By creating special versions of this class that are + * used in only a single location of the code, we increase the chance that only a single type of + * Row will be referenced, increasing the opportunity for the JIT to play tricks. This sounds + * crazy but in benchmarks it had noticeable effects. */ class JoinedRow2 extends Row { private[this] var row1: Row = _ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala index 4a7a563cce5e8..75ea0e8459df8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala @@ -20,42 +20,43 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.types._ /** + * A parent class for mutable container objects that are reused when the values are changed, + * resulting in less garbage. These values are held by a [[SpecificMutableRow]]. * + * The following code was roughly used to generate these objects: + * {{{ + * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",") + * types.map {tpe => + * s""" + * final class Mutable$tpe extends MutableValue { + * var value: $tpe = 0 + * def boxed = if (isNull) null else value + * def update(v: Any) = value = { + * isNull = false + * v.asInstanceOf[$tpe] + * } + * def copy() = { + * val newCopy = new Mutable$tpe + * newCopy.isNull = isNull + * newCopy.value = value + * newCopy.asInstanceOf[this.type] + * } + * }""" + * }.foreach(println) * + * types.map { tpe => + * s""" + * override def set$tpe(ordinal: Int, value: $tpe): Unit = { + * val currentValue = values(ordinal).asInstanceOf[Mutable$tpe] + * currentValue.isNull = false + * currentValue.value = value + * } * -{{{ -val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",") -types.map {tpe => -s""" -final class Mutable$tpe extends MutableValue { - var value: $tpe = 0 - def boxed = if (isNull) null else value - def update(v: Any) = value = { - isNull = false - v.asInstanceOf[$tpe] - } - def copy() = { - val newCopy = new Mutable$tpe - newCopy.isNull = isNull - newCopy.value = value - newCopy.asInstanceOf[this.type] - } -}""" -}.foreach(println) - -types.map { tpe => -s""" - override def set$tpe(ordinal: Int, value: $tpe): Unit = { - val currentValue = values(ordinal).asInstanceOf[Mutable$tpe] - currentValue.isNull = false - currentValue.value = value - } - - override def get$tpe(i: Int): $tpe = { - values(i).asInstanceOf[Mutable$tpe].value - }""" -}.foreach(println) -}}} + * override def get$tpe(i: Int): $tpe = { + * values(i).asInstanceOf[Mutable$tpe].value + * }""" + * }.foreach(println) + * }}} */ abstract class MutableValue extends Serializable { var isNull: Boolean = true @@ -184,7 +185,12 @@ final class MutableAny extends MutableValue { } } -class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow { +/** + * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen + * based on the dataTypes of each column. The intent is to decrease garbage when modifying the + * values of primitive columns. + */ +final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow { def this(dataTypes: Seq[DataType]) = this( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index 923a9b1445d6b..8d90614e4501a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -93,12 +93,25 @@ case class MaxOf(left: Expression, right: Expression) extends Expression { override def children = left :: right :: Nil - override def references = (left.flatMap(_.references) ++ right.flatMap(_.references)).toSet + override def references = left.references ++ right.references override def dataType = left.dataType override def eval(input: Row): Any = { val leftEval = left.eval(input) + val rightEval = right.eval(input) + if (leftEval == null) { + rightEval + } else if (rightEval == null) { + leftEval + } else { + val numeric = left.dataType.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]] + if (numeric.compare(leftEval, rightEval) < 0) { + rightEval + } else { + leftEval + } + } } override def toString = s"MaxOf($left, $right)" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 7362d3d6b0e7c..f06b8c78a1be9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types._ +// These classes are here to avoid issues with serialization and integration with quasiquotes. class IntegerHashSet extends org.apache.spark.util.collection.OpenHashSet[Int] class LongHashSet extends org.apache.spark.util.collection.OpenHashSet[Long] @@ -53,6 +54,11 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin private val curId = new java.util.concurrent.atomic.AtomicInteger() private val javaSeparator = "$" + /** + * Can be flipped on manually in the console to add (expensive) expression evaluation trace code. + */ + var debugLogging = false + /** * Generates a class for a given input expression. Called when there is not cached code * already available. @@ -496,7 +502,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin // Only inject debugging code if debugging is turned on. val debugCode = - if (false) { + if (debugLogging) { val localLogger = log val localLoggerTree = reify { localLogger } q""" diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala index 999c9fff38d60..f1df817c41362 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala @@ -136,6 +136,16 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation(In(Literal(1), Seq(Literal(1), Literal(2))) && In(Literal(2), Seq(Literal(1), Literal(2))), true) } + test("MaxOf") { + checkEvaluation(MaxOf(1, 2), 2) + checkEvaluation(MaxOf(2, 1), 2) + checkEvaluation(MaxOf(1L, 2L), 2L) + checkEvaluation(MaxOf(2L, 1L), 2L) + + checkEvaluation(MaxOf(Literal(null, IntegerType), 2), 2) + checkEvaluation(MaxOf(2, Literal(null, IntegerType)), 2) + } + test("LIKE literal Regular Expression") { checkEvaluation(Literal(null, StringType).like("a"), null) checkEvaluation(Literal("a", StringType).like(Literal(null, StringType)), null) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 206f514e9de00..2653871c2a7eb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -158,7 +158,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold, - s"query should contain two relations, each of which has size smaller than autoConvertSize instead ${rdd.queryExecution}") + s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be // matched, other strategies need to be applied.