Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3891][SQL] Add array support to percentile, percentile_approx and constant inspectors support #2802

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ case class GetField(child: Expression, fieldName: String) extends UnaryExpressio
*/
case class CreateArray(children: Seq[Expression]) extends Expression {
override type EvaluatedType = Any


override def foldable = !children.exists(!_.foldable)

lazy val childTypes = children.map(_.dataType).distinct

override lazy val resolved =
Expand Down
35 changes: 25 additions & 10 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq
override def foldable =
isUDFDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector]

@transient
protected def constantReturnValue = unwrap(
returnInspector.asInstanceOf[ConstantObjectInspector].getWritableConstantValue(),
returnInspector)

@transient
protected lazy val deferedObjects =
argumentInspectors.map(new DeferredObjectAdapter(_)).toArray[DeferredObject]
Expand All @@ -172,6 +177,8 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq

override def eval(input: Row): Any = {
returnInspector // Make sure initialized.
if(foldable) return constantReturnValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably unnecessary, as constant folding rule in Optimizer will replace the foldable expression with Literal. Please correct me if there is exceptional case. (and above property constantreturnValue)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constant check and returning value is required for two reasons:

  1. some UDF functions returns constant iterator when initializeAndFoldConstants called with constant iterators, by executing them once.
    But if the same are called with "function.evaluate" they will not return the same constant value type. There will be mismatch in the datatype expected by constantReturnInspector datatype vs datatype returned by function.evaluate.(Ex: org.apache.hadoop.io.Text vs String). This fails unwrap.
    So if return Inspector is constant we don't need to call "function.evaluate" as the expression is already evaluated and return value is already present in constant iterator.
    This I have uncovered, when I made CreateArrayExression as foldable, then test fails. So modified as part of current defect fix only.
  2. Even if Literal is identified during optimization, expression is evaluated twice, once during return Inspector creation and next during eval function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, I guess there is probably a bug in master as you described, can you paste a query to reproduce the failure? (Text V.S. String).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In HiveQuerySuite, "constant array" testcase was failing
SELECT sort_array(
sort_array(
array("hadoop distributed file system",
"enterprise databases", "hadoop map-reduce")))
FROM src LIMIT 1

[info] - constant array *** FAILED *** (596 milliseconds)
[info] Failed to execute query using catalyst:
[info] Error: java.lang.String cannot be cast to org.apache.hadoop.io.Text

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenghao-intel, I think you understand this code better than I do. Are you satisfied with the explanation? Does this approach seem reasonable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this failure is due to the bug of nested constant Expression <-> ObjectInspector in HiveInspectors, and I will fix that in #3429.

@gvramana , how about revert the changes in CreateArray and HiveGenericUDF? I think we can merge the others first. And you can create a new PR for CreateArray.foldable which depends on #3429, since it currently doesn't break anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenghao-intel This PR cannot be separately merged without CreateArray, as percentile_approx accepts only constant array iterator and fails otherwise. I think we can go ahead and merge all these changes as they don't break build or tests, and are not directly dependent on #3429 in order of merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the #3429, I think this PR can be more simpler after #3429 merged. Besides, invoking the foldable win eval probably too heavy, which is supposed to be eliminated in Optimizer.


var i = 0
while (i < children.length) {
val idx = i
Expand All @@ -198,12 +205,13 @@ private[hive] case class HiveGenericUdaf(

@transient
protected lazy val objectInspector = {
resolver.getEvaluator(children.map(_.dataType.toTypeInfo).toArray)
val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors.toArray, false, false)
resolver.getEvaluator(parameterInfo)
.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray)
}

@transient
protected lazy val inspectors = children.map(_.dataType).map(toInspector)
protected lazy val inspectors = children.map(toInspector)

def dataType: DataType = inspectorToDataType(objectInspector)

Expand All @@ -228,12 +236,13 @@ private[hive] case class HiveUdaf(

@transient
protected lazy val objectInspector = {
resolver.getEvaluator(children.map(_.dataType.toTypeInfo).toArray)
val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors.toArray, false, false)
resolver.getEvaluator(parameterInfo)
.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray)
}

@transient
protected lazy val inspectors = children.map(_.dataType).map(toInspector)
protected lazy val inspectors = children.map(toInspector)

def dataType: DataType = inspectorToDataType(objectInspector)

Expand Down Expand Up @@ -266,7 +275,7 @@ private[hive] case class HiveGenericUdtf(
protected lazy val function: GenericUDTF = createFunction()

@transient
protected lazy val inputInspectors = children.map(_.dataType).map(toInspector)
protected lazy val inputInspectors = children.map(toInspector)

@transient
protected lazy val outputInspector = function.initialize(inputInspectors.toArray)
Expand Down Expand Up @@ -340,10 +349,13 @@ private[hive] case class HiveUdafFunction(
} else {
createFunction[AbstractGenericUDAFResolver]()
}

private val inspectors = exprs.map(_.dataType).map(toInspector).toArray

private val function = resolver.getEvaluator(exprs.map(_.dataType.toTypeInfo).toArray)

private val inspectors = exprs.map(toInspector).toArray

private val function = {
val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, false, false)
resolver.getEvaluator(parameterInfo)
}

private val returnInspector = function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)

Expand All @@ -356,8 +368,11 @@ private[hive] case class HiveUdafFunction(
@transient
val inputProjection = new InterpretedProjection(exprs)

@transient
protected lazy val cached = new Array[AnyRef](exprs.length)

def update(input: Row): Unit = {
val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray
function.iterate(buffer, inputs)
function.iterate(buffer, wrap(inputs, inspectors, cached))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,21 @@ class HiveUdfSuite extends QueryTest {
}

test("SPARK-2693 udaf aggregates test") {
checkAnswer(sql("SELECT percentile(key,1) FROM src LIMIT 1"),
checkAnswer(sql("SELECT percentile(key, 1) FROM src LIMIT 1"),
sql("SELECT max(key) FROM src").collect().toSeq)

checkAnswer(sql("SELECT percentile(key, array(1, 1)) FROM src LIMIT 1"),
sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq)
}

test("Generic UDAF aggregates") {
checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.99999)) FROM src LIMIT 1"),
sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq)

checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9, 0.9)) FROM src LIMIT 1"),
sql("SELECT array(100, 100) FROM src LIMIT 1").collect().toSeq)
}

test("UDFIntegerToString") {
val testData = TestHive.sparkContext.parallelize(
IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil)
Expand Down