Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27134][SQL] array_distinct function does not work correctly with columns containing array of array #24073

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3112,29 +3112,29 @@ case class ArrayDistinct(child: Expression)
(data: Array[AnyRef]) => new GenericArrayData(data.distinct.asInstanceOf[Array[Any]])
} else {
(data: Array[AnyRef]) => {
var foundNullElement = false
var pos = 0
val arrayBuffer = new scala.collection.mutable.ArrayBuffer[AnyRef]
var alreadyStoredNull = false
for (i <- 0 until data.length) {
if (data(i) == null) {
if (!foundNullElement) {
foundNullElement = true
pos = pos + 1
}
} else {
if (data(i) != null) {
var found = false
var j = 0
var done = false
while (j <= i && !done) {
if (data(j) != null && ordering.equiv(data(j), data(i))) {
done = true
}
j = j + 1
while (!found && j < arrayBuffer.size) {
val va = arrayBuffer(j)
found = (va != null) && ordering.equiv(va, data(i))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than handle nulls separately, can you just check it here?

found = if (va == null) data(i) == null else ordering.equiv(...)

It also kind of looks like the ordering already handles nulls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Thanks. Actually in my understanding, ordering does not seem to handle nulls. Given that, the proposed condition does not work for case when va != null and data[i] is null. We get a null pointer exception. We can probably do something like :

 if (data(i) != null && va != null) {
     found = ordering.equiv(va, data(i))
  } else if (data(i) == null && va == null) {
     found = true
  }

Given this, i feel the existing code reads better and i think performs better if we have many null and non-null values in the array by keeping the null handling outside the inner loop. Also, i believe, for other collection operation functions we treat nulls separately. But i will change, if you feel otherwise. Please let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, on second look the ordering in ArrayType doesn't handle nulls, only nulls in the array. This is fine. You could save a lookup with...

if (data(i) == null) {
  found = va == null
} else if (va != null) {
  found = ordering.equiv(va, data(i))
}

but I don't feel strongly about it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal explained my intention in the code of null part. @srowen is code simple and easy for reading, but it may include # of iterations if we have already seen null.

j += 1
}
if (i == j - 1) {
pos = pos + 1
if (!found) {
arrayBuffer += data(i)
}
} else {
// De-duplicate the null values.
if (!alreadyStoredNull) {
arrayBuffer += data(i)
alreadyStoredNull = true
}
}
}
new GenericArrayData(data.slice(0, pos))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like original implementation assumes the duplicate items are placed at the end of data? I think it not only affects array of array, but also other element type like BinaryType.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case fails at current master, but passes after this change.

val a8 = Literal.create(Seq(2, 1, 2, 3, 4, 4, 5).map(_.toString.getBytes), ArrayType(BinaryType))
checkEvaluation(new ArrayDistinct(a8), Seq(2, 1, 3, 4, 5).map(_.toString.getBytes))

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya

Looks like original implementation assumes the duplicate items are placed at the end of data?

Probably. The thing is, it does not rearrange the data in any way. So i don't know how we can just return the slice of the original array.

I think it not only affects array of array, but also other element type like BinaryType

Yeah.. I will add your test case or enhance the Array[Binary] test case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.
Actually I'm not sure how I could miss this.
Thanks!

new GenericArrayData(arrayBuffer)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,8 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
ArrayType(DoubleType))
val a7 = Literal.create(Seq(1.123f, 0.1234f, 1.121f, 1.123f, 1.1230f, 1.121f, 0.1234f),
ArrayType(FloatType))
val a8 =
Literal.create(Seq(2, 1, 2, 3, 4, 4, 5).map(_.toString.getBytes), ArrayType(BinaryType))

checkEvaluation(new ArrayDistinct(a0), Seq(2, 1, 3, 4, 5))
checkEvaluation(new ArrayDistinct(a1), Seq.empty[Integer])
Expand All @@ -1373,6 +1375,7 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
checkEvaluation(new ArrayDistinct(a5), Seq(true, false))
checkEvaluation(new ArrayDistinct(a6), Seq(1.123, 0.1234, 1.121))
checkEvaluation(new ArrayDistinct(a7), Seq(1.123f, 0.1234f, 1.121f))
checkEvaluation(new ArrayDistinct(a8), Seq(2, 1, 3, 4, 5).map(_.toString.getBytes))

// complex data types
val b0 = Literal.create(Seq[Array[Byte]](Array[Byte](5, 6), Array[Byte](1, 2),
Expand All @@ -1393,9 +1396,17 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
ArrayType(ArrayType(IntegerType)))
val c2 = Literal.create(Seq[Seq[Int]](null, Seq[Int](2, 1), null, null, Seq[Int](2, 1), null),
ArrayType(ArrayType(IntegerType)))
val c3 = Literal.create(Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](1, 2), Seq[Int](1, 2),
Seq[Int](3, 4), Seq[Int](4, 5)), ArrayType(ArrayType(IntegerType)))
val c4 = Literal.create(Seq[Seq[Int]](null, Seq[Int](1, 2), Seq[Int](1, 2),
Seq[Int](3, 4), Seq[Int](4, 5), null), ArrayType(ArrayType(IntegerType)))
checkEvaluation(ArrayDistinct(c0), Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](3, 4)))
checkEvaluation(ArrayDistinct(c1), Seq[Seq[Int]](Seq[Int](5, 6), Seq[Int](2, 1)))
checkEvaluation(ArrayDistinct(c2), Seq[Seq[Int]](null, Seq[Int](2, 1)))
checkEvaluation(ArrayDistinct(c3), Seq[Seq[Int]](Seq[Int](1, 2), Seq[Int](3, 4),
Seq[Int](4, 5)))
checkEvaluation(ArrayDistinct(c4), Seq[Seq[Int]](null, Seq[Int](1, 2), Seq[Int](3, 4),
Seq[Int](4, 5)))
}

test("Array Union") {
Expand Down