From 1e76669f583d30cb7f8ec091e965bf155732edfc Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 12 Mar 2019 14:42:29 -0700 Subject: [PATCH 1/7] [SPARK-27134] array_distinct function does not work correctly with columns containing array of array --- .../expressions/collectionOperations.scala | 35 ++++++++++--------- .../CollectionExpressionsSuite.scala | 4 +++ 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 6cc5bc85bcfb4..74a0ce5623e9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -3088,6 +3088,7 @@ case class ArrayDistinct(child: Expression) override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + override def dataType: DataType = child.dataType @transient private lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType @@ -3112,29 +3113,29 @@ case class ArrayDistinct(child: Expression) (data: Array[AnyRef]) => new GenericArrayData(data.distinct.asInstanceOf[Array[Any]]) } else { (data: Array[AnyRef]) => { - var foundNullElement = false - var pos = 0 + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadyStoredNull = false + var found = false for (i <- 0 until data.length) { - if (data(i) == null) { - if (!foundNullElement) { - foundNullElement = true - pos = pos + 1 + if (data(i) != null) { + found = false + var j = 0; + while (!found && j < arrayBuffer.size) { + val va = arrayBuffer(j) + found = (va != null) && ordering.equiv(va, data(i)) + j += 1 } - } else { - var j = 0 - var done = false - while (j <= i && !done) { - if (data(j) != null && ordering.equiv(data(j), data(i))) { - done = true - } - j = j + 1 + if (!found) { + arrayBuffer += data(i) } - if (i == j - 1) { - pos = pos + 1 + } else { + if (!alreadyStoredNull) { + arrayBuffer += data(i) + alreadyStoredNull = true } } } - new GenericArrayData(data.slice(0, pos)) + new GenericArrayData(arrayBuffer) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala index c1c045938cad8..e4f2eb84076bb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala @@ -1393,9 +1393,13 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper ArrayType(ArrayType(IntegerType))) val c2 = Literal.create(Seq[Seq[Int]](null, Seq[Int](2, 1), null, null, Seq[Int](2, 1), null), ArrayType(ArrayType(IntegerType))) + val c4 = Literal.create(Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](1, 2), Seq[Int](1, 2), + Seq[Int](3, 4), Seq[Int](4, 5)), ArrayType(ArrayType(IntegerType))) checkEvaluation(ArrayDistinct(c0), Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](3, 4))) checkEvaluation(ArrayDistinct(c1), Seq[Seq[Int]](Seq[Int](5, 6), Seq[Int](2, 1))) checkEvaluation(ArrayDistinct(c2), Seq[Seq[Int]](null, Seq[Int](2, 1))) + checkEvaluation(ArrayDistinct(c4), Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](3, 4), + Seq[Int](4, 5))) } test("Array Union") { From 860ed8700f6201e61fe8b6e293d34c7a27d4cfa4 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 12 Mar 2019 14:44:46 -0700 Subject: [PATCH 2/7] remove space --- .../spark/sql/catalyst/expressions/collectionOperations.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 74a0ce5623e9a..d9eb62adee839 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -3087,8 +3087,7 @@ case class ArrayDistinct(child: Expression) extends UnaryExpression with ArraySetLike with ExpectsInputTypes { override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) - - + override def dataType: DataType = child.dataType @transient private lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType From 049bf9b28072b8e6c862123a664d89762017c49c Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 12 Mar 2019 15:20:02 -0700 Subject: [PATCH 3/7] style and one more test --- .../spark/sql/catalyst/expressions/collectionOperations.scala | 3 ++- .../sql/catalyst/expressions/CollectionExpressionsSuite.scala | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index d9eb62adee839..4d537cc0adfea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -3087,7 +3087,7 @@ case class ArrayDistinct(child: Expression) extends UnaryExpression with ArraySetLike with ExpectsInputTypes { override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) - + override def dataType: DataType = child.dataType @transient private lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType @@ -3128,6 +3128,7 @@ case class ArrayDistinct(child: Expression) arrayBuffer += data(i) } } else { + // De-duplicate the null values. if (!alreadyStoredNull) { arrayBuffer += data(i) alreadyStoredNull = true diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala index e4f2eb84076bb..f43db6604d197 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala @@ -1395,11 +1395,15 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper ArrayType(ArrayType(IntegerType))) val c4 = Literal.create(Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](1, 2), Seq[Int](1, 2), Seq[Int](3, 4), Seq[Int](4, 5)), ArrayType(ArrayType(IntegerType))) + val c5 = Literal.create(Seq[Seq[Int]](null, Seq[Int](1, 2), Seq[Int](1, 2), + Seq[Int](3, 4), Seq[Int](4, 5), null), ArrayType(ArrayType(IntegerType))) checkEvaluation(ArrayDistinct(c0), Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](3, 4))) checkEvaluation(ArrayDistinct(c1), Seq[Seq[Int]](Seq[Int](5, 6), Seq[Int](2, 1))) checkEvaluation(ArrayDistinct(c2), Seq[Seq[Int]](null, Seq[Int](2, 1))) checkEvaluation(ArrayDistinct(c4), Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](3, 4), Seq[Int](4, 5))) + checkEvaluation(ArrayDistinct(c5), Seq[Seq[Int]](null, Seq[Int](1, 2), Seq[Int](3, 4), + Seq[Int](4, 5))) } test("Array Union") { From d59b0f70d6844b9b763fcd7de77309fdd18658d8 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 12 Mar 2019 15:55:53 -0700 Subject: [PATCH 4/7] variable names --- .../catalyst/expressions/CollectionExpressionsSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala index f43db6604d197..00adae14b89b5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala @@ -1393,16 +1393,16 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper ArrayType(ArrayType(IntegerType))) val c2 = Literal.create(Seq[Seq[Int]](null, Seq[Int](2, 1), null, null, Seq[Int](2, 1), null), ArrayType(ArrayType(IntegerType))) - val c4 = Literal.create(Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](1, 2), Seq[Int](1, 2), + val c3 = Literal.create(Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](1, 2), Seq[Int](1, 2), Seq[Int](3, 4), Seq[Int](4, 5)), ArrayType(ArrayType(IntegerType))) - val c5 = Literal.create(Seq[Seq[Int]](null, Seq[Int](1, 2), Seq[Int](1, 2), + val c4 = Literal.create(Seq[Seq[Int]](null, Seq[Int](1, 2), Seq[Int](1, 2), Seq[Int](3, 4), Seq[Int](4, 5), null), ArrayType(ArrayType(IntegerType))) checkEvaluation(ArrayDistinct(c0), Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](3, 4))) checkEvaluation(ArrayDistinct(c1), Seq[Seq[Int]](Seq[Int](5, 6), Seq[Int](2, 1))) checkEvaluation(ArrayDistinct(c2), Seq[Seq[Int]](null, Seq[Int](2, 1))) - checkEvaluation(ArrayDistinct(c4), Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](3, 4), + checkEvaluation(ArrayDistinct(c3), Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](3, 4), Seq[Int](4, 5))) - checkEvaluation(ArrayDistinct(c5), Seq[Seq[Int]](null, Seq[Int](1, 2), Seq[Int](3, 4), + checkEvaluation(ArrayDistinct(c4), Seq[Seq[Int]](null, Seq[Int](1, 2), Seq[Int](3, 4), Seq[Int](4, 5))) } From dfdd109e909d4176a715870f2dbc9e617de11973 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 13 Mar 2019 17:17:58 -0700 Subject: [PATCH 5/7] Code review --- .../sql/catalyst/expressions/collectionOperations.scala | 7 +++---- .../catalyst/expressions/CollectionExpressionsSuite.scala | 3 +++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 4d537cc0adfea..f7955bc923395 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -3112,13 +3112,12 @@ case class ArrayDistinct(child: Expression) (data: Array[AnyRef]) => new GenericArrayData(data.distinct.asInstanceOf[Array[Any]]) } else { (data: Array[AnyRef]) => { - val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[AnyRef] var alreadyStoredNull = false - var found = false for (i <- 0 until data.length) { if (data(i) != null) { - found = false - var j = 0; + var found = false + var j = 0 while (!found && j < arrayBuffer.size) { val va = arrayBuffer(j) found = (va != null) && ordering.equiv(va, data(i)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala index 00adae14b89b5..603073b40d7aa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala @@ -1364,6 +1364,8 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper ArrayType(DoubleType)) val a7 = Literal.create(Seq(1.123f, 0.1234f, 1.121f, 1.123f, 1.1230f, 1.121f, 0.1234f), ArrayType(FloatType)) + val a8 = + Literal.create(Seq(2, 1, 2, 3, 4, 4, 5).map(_.toString.getBytes), ArrayType(BinaryType)) checkEvaluation(new ArrayDistinct(a0), Seq(2, 1, 3, 4, 5)) checkEvaluation(new ArrayDistinct(a1), Seq.empty[Integer]) @@ -1373,6 +1375,7 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(new ArrayDistinct(a5), Seq(true, false)) checkEvaluation(new ArrayDistinct(a6), Seq(1.123, 0.1234, 1.121)) checkEvaluation(new ArrayDistinct(a7), Seq(1.123f, 0.1234f, 1.121f)) + checkEvaluation(new ArrayDistinct(a8), Seq(2, 1, 3, 4, 5).map(_.toString.getBytes)) // complex data types val b0 = Literal.create(Seq[Array[Byte]](Array[Byte](5, 6), Array[Byte](1, 2), From 60eb195656119181a64644148d4499eacaa25e0c Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 14 Mar 2019 14:34:38 -0700 Subject: [PATCH 6/7] Code review --- .../expressions/collectionOperations.scala | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index f7955bc923395..e671373469639 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -3113,25 +3113,21 @@ case class ArrayDistinct(child: Expression) } else { (data: Array[AnyRef]) => { val arrayBuffer = new scala.collection.mutable.ArrayBuffer[AnyRef] - var alreadyStoredNull = false for (i <- 0 until data.length) { - if (data(i) != null) { - var found = false - var j = 0 - while (!found && j < arrayBuffer.size) { - val va = arrayBuffer(j) - found = (va != null) && ordering.equiv(va, data(i)) - j += 1 - } - if (!found) { - arrayBuffer += data(i) - } - } else { - // De-duplicate the null values. - if (!alreadyStoredNull) { - arrayBuffer += data(i) - alreadyStoredNull = true + var found = false + var j = 0 + while (!found && j < arrayBuffer.size) { + val va = arrayBuffer(j) + if (data(i) == null) { + // De-duplicate null values. + found = va == null + } else if (va != null) { + found = ordering.equiv(va, data(i)) } + j += 1 + } + if (!found) { + arrayBuffer += data(i) } } new GenericArrayData(arrayBuffer) From 1bde41c49528b0b58329f07fd3daef84fa411081 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 14 Mar 2019 23:01:49 -0700 Subject: [PATCH 7/7] Revert "Code review" This reverts commit 60eb195656119181a64644148d4499eacaa25e0c. --- .../expressions/collectionOperations.scala | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index e671373469639..f7955bc923395 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -3113,21 +3113,25 @@ case class ArrayDistinct(child: Expression) } else { (data: Array[AnyRef]) => { val arrayBuffer = new scala.collection.mutable.ArrayBuffer[AnyRef] + var alreadyStoredNull = false for (i <- 0 until data.length) { - var found = false - var j = 0 - while (!found && j < arrayBuffer.size) { - val va = arrayBuffer(j) - if (data(i) == null) { - // De-duplicate null values. - found = va == null - } else if (va != null) { - found = ordering.equiv(va, data(i)) + if (data(i) != null) { + var found = false + var j = 0 + while (!found && j < arrayBuffer.size) { + val va = arrayBuffer(j) + found = (va != null) && ordering.equiv(va, data(i)) + j += 1 + } + if (!found) { + arrayBuffer += data(i) + } + } else { + // De-duplicate the null values. + if (!alreadyStoredNull) { + arrayBuffer += data(i) + alreadyStoredNull = true } - j += 1 - } - if (!found) { - arrayBuffer += data(i) } } new GenericArrayData(arrayBuffer)