-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27134][SQL] array_distinct function does not work correctly with columns containing array of array #24073
Conversation
…lumns containing array of array
Test build #103388 has finished for PR 24073 at commit
|
Test build #103391 has finished for PR 24073 at commit
|
Test build #103395 has finished for PR 24073 at commit
|
retest this please |
Test build #103409 has finished for PR 24073 at commit
|
retest this please |
Can you put an example query to reproduce this bug in the PR description? |
cc @ueshin |
Thanks, let me see later. |
@maropu Updated the PR description. Thank you. |
} | ||
} | ||
} | ||
new GenericArrayData(data.slice(0, pos)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like original implementation assumes the duplicate items are placed at the end of data
? I think it not only affects array of array, but also other element type like BinaryType
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case fails at current master, but passes after this change.
val a8 = Literal.create(Seq(2, 1, 2, 3, 4, 4, 5).map(_.toString.getBytes), ArrayType(BinaryType))
checkEvaluation(new ArrayDistinct(a8), Seq(2, 1, 3, 4, 5).map(_.toString.getBytes))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like original implementation assumes the duplicate items are placed at the end of data?
Probably. The thing is, it does not rearrange the data in any way. So i don't know how we can just return the slice of the original array.
I think it not only affects array of array, but also other element type like BinaryType
Yeah.. I will add your test case or enhance the Array[Binary] test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
Actually I'm not sure how I could miss this.
Thanks!
Test build #103419 has finished for PR 24073 at commit
|
@@ -3112,29 +3112,30 @@ case class ArrayDistinct(child: Expression) | |||
(data: Array[AnyRef]) => new GenericArrayData(data.distinct.asInstanceOf[Array[Any]]) | |||
} else { | |||
(data: Array[AnyRef]) => { | |||
var foundNullElement = false | |||
var pos = 0 | |||
val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this be an ArrayBuffer[Array[AnyRef]]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are returning a GenericArrayData(arrayBuffer). GenericArrayData takes a parameter of Array[Any]. So we are okay here, no ? I looked at existing implementation for ArrayUnion and ArrayExcept for reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine, you can pass an Array of anything to it then (right? or is there some compile-time issue I'm not thinking of). It's not that it doesn't work but this code locally could be more precise. No big deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Sorry.. a little confused. So we have a input which is a Array[AnyRef]. Now if we declare the temporary buffer a ArrayBuffer[Array[AnyRef]], how do we populate its content ?
Example :
Input1 : Array[Integer] => Seq(1, 2, , 1)
In this case our output is : ArrayBuffer[Int] = Array(1, 2)
Input2 : Array[Array[Integer]] => Seq(Seq(1, 2), Seq(3, 4), Seq(3,4))
In this case our output is : ArrayBuffer[Array[Int]] => Array(Array(1,2), Array(3,4))
Input3 : Array[Struct] => Seq(struct(...), struct(...))
Thanks for your help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Disregard this, I'm mistaken. The use case here was arrays of arrays but this code isn't handling only array elements. Can the type by AnyRef though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Thank you.. Sure i will change it to Anyref.
var j = 0; | ||
while (!found && j < arrayBuffer.size) { | ||
val va = arrayBuffer(j) | ||
found = (va != null) && ordering.equiv(va, data(i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than handle nulls separately, can you just check it here?
found = if (va == null) data(i) == null else ordering.equiv(...)
It also kind of looks like the ordering already handles nulls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Thanks. Actually in my understanding, ordering does not seem to handle nulls. Given that, the proposed condition does not work for case when va != null and data[i] is null. We get a null pointer exception. We can probably do something like :
if (data(i) != null && va != null) {
found = ordering.equiv(va, data(i))
} else if (data(i) == null && va == null) {
found = true
}
Given this, i feel the existing code reads better and i think performs better if we have many null and non-null values in the array by keeping the null handling outside the inner loop. Also, i believe, for other collection operation functions we treat nulls separately. But i will change, if you feel otherwise. Please let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, on second look the ordering in ArrayType doesn't handle nulls, only nulls in the array. This is fine. You could save a lookup with...
if (data(i) == null) {
found = va == null
} else if (va != null) {
found = ordering.equiv(va, data(i))
}
but I don't feel strongly about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dilipbiswal explained my intention in the code of null
part. @srowen is code simple and easy for reading, but it may include # of iterations if we have already seen null
.
pos = pos + 1 | ||
if (data(i) != null) { | ||
found = false | ||
var j = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove semicolon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen will do.
var pos = 0 | ||
val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] | ||
var alreadyStoredNull = false | ||
var found = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this var inside the if statement below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen ok.
Test build #103467 has started for PR 24073 at commit |
@HyukjinKwon Hello.. the test seems to be stuck in |
Jenkins has weird behaivours now and it seems the tests stopped in #24028 (comment), too. |
retest this please |
Test build #103479 has finished for PR 24073 at commit
|
retest this please |
Test build #103482 has finished for PR 24073 at commit
|
Test build #103515 has finished for PR 24073 at commit
|
|
This reverts commit 60eb195.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems fine. Is there a more general problem of this form? someone else mentioned it might not be specific to arrays. It's OK to consider that separately if so, just checking if there is another very similar fix to be made elsewhere
LGTM, thank you for fixing this mistake. |
Jenkins, retest this please. |
Test build #103537 has finished for PR 24073 at commit
|
…th columns containing array of array ## What changes were proposed in this pull request? Correct the logic to compute the distinct. Below is a small repro snippet. ``` scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col") df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>] scala> val distinctDF = df.select(array_distinct(col("array_col"))) distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>] scala> df.show(false) +----------------------------------------+ |array_col | +----------------------------------------+ |[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]| +----------------------------------------+ ``` Error ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [1, 2], [1, 2]] | +-------------------------+ ``` Expected result ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [3, 4], [4, 5]] | +-------------------------+ ``` ## How was this patch tested? Added an additional test. Closes #24073 from dilipbiswal/SPARK-27134. Authored-by: Dilip Biswal <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit aea9a57) Signed-off-by: Sean Owen <[email protected]>
Merged to master/2.4. It didn't pick cleanly into 2.3 and I wasn't clear whether it affected 2.3 |
@srowen Actually this function was added in 2.4. So we should be good :-) |
…th columns containing array of array ## What changes were proposed in this pull request? Correct the logic to compute the distinct. Below is a small repro snippet. ``` scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col") df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>] scala> val distinctDF = df.select(array_distinct(col("array_col"))) distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>] scala> df.show(false) +----------------------------------------+ |array_col | +----------------------------------------+ |[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]| +----------------------------------------+ ``` Error ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [1, 2], [1, 2]] | +-------------------------+ ``` Expected result ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [3, 4], [4, 5]] | +-------------------------+ ``` ## How was this patch tested? Added an additional test. Closes apache#24073 from dilipbiswal/SPARK-27134. Authored-by: Dilip Biswal <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit aea9a57) Signed-off-by: Sean Owen <[email protected]>
…th columns containing array of array ## What changes were proposed in this pull request? Correct the logic to compute the distinct. Below is a small repro snippet. ``` scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col") df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>] scala> val distinctDF = df.select(array_distinct(col("array_col"))) distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>] scala> df.show(false) +----------------------------------------+ |array_col | +----------------------------------------+ |[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]| +----------------------------------------+ ``` Error ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [1, 2], [1, 2]] | +-------------------------+ ``` Expected result ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [3, 4], [4, 5]] | +-------------------------+ ``` ## How was this patch tested? Added an additional test. Closes apache#24073 from dilipbiswal/SPARK-27134. Authored-by: Dilip Biswal <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit aea9a57) Signed-off-by: Sean Owen <[email protected]>
…th columns containing array of array ## What changes were proposed in this pull request? Correct the logic to compute the distinct. Below is a small repro snippet. ``` scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col") df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>] scala> val distinctDF = df.select(array_distinct(col("array_col"))) distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>] scala> df.show(false) +----------------------------------------+ |array_col | +----------------------------------------+ |[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]| +----------------------------------------+ ``` Error ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [1, 2], [1, 2]] | +-------------------------+ ``` Expected result ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [3, 4], [4, 5]] | +-------------------------+ ``` ## How was this patch tested? Added an additional test. Closes apache#24073 from dilipbiswal/SPARK-27134. Authored-by: Dilip Biswal <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit aea9a57) Signed-off-by: Sean Owen <[email protected]>
…th columns containing array of array ## What changes were proposed in this pull request? Correct the logic to compute the distinct. Below is a small repro snippet. ``` scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col") df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>] scala> val distinctDF = df.select(array_distinct(col("array_col"))) distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>] scala> df.show(false) +----------------------------------------+ |array_col | +----------------------------------------+ |[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]| +----------------------------------------+ ``` Error ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [1, 2], [1, 2]] | +-------------------------+ ``` Expected result ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [3, 4], [4, 5]] | +-------------------------+ ``` ## How was this patch tested? Added an additional test. Closes apache#24073 from dilipbiswal/SPARK-27134. Authored-by: Dilip Biswal <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit aea9a57) Signed-off-by: Sean Owen <[email protected]>
…th columns containing array of array ## What changes were proposed in this pull request? Correct the logic to compute the distinct. Below is a small repro snippet. ``` scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col") df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>] scala> val distinctDF = df.select(array_distinct(col("array_col"))) distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>] scala> df.show(false) +----------------------------------------+ |array_col | +----------------------------------------+ |[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]| +----------------------------------------+ ``` Error ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [1, 2], [1, 2]] | +-------------------------+ ``` Expected result ``` scala> distinctDF.show(false) +-------------------------+ |array_distinct(array_col)| +-------------------------+ |[[1, 2], [3, 4], [4, 5]] | +-------------------------+ ``` ## How was this patch tested? Added an additional test. Closes apache#24073 from dilipbiswal/SPARK-27134. Authored-by: Dilip Biswal <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit aea9a57) Signed-off-by: Sean Owen <[email protected]>
What changes were proposed in this pull request?
Correct the logic to compute the distinct.
Below is a small repro snippet.
Error
Expected result
How was this patch tested?
Added an additional test.