Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27134][SQL] array_distinct function does not work correctly with columns containing array of array #24073

Closed
wants to merge 7 commits into from

Conversation

dilipbiswal
Copy link
Contributor

@dilipbiswal dilipbiswal commented Mar 12, 2019

What changes were proposed in this pull request?

Correct the logic to compute the distinct.

Below is a small repro snippet.

scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col")
df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>]

scala> val distinctDF = df.select(array_distinct(col("array_col")))
distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>]

scala> df.show(false)
+----------------------------------------+
|array_col                               |
+----------------------------------------+
|[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]|
+----------------------------------------+

Error

scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [1, 2], [1, 2]] |
+-------------------------+

Expected result

scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [3, 4], [4, 5]] |
+-------------------------+

How was this patch tested?

Added an additional test.

@SparkQA
Copy link

SparkQA commented Mar 12, 2019

Test build #103388 has finished for PR 24073 at commit 860ed87.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 13, 2019

Test build #103391 has finished for PR 24073 at commit 049bf9b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

cc @ushin @kiszk

@SparkQA
Copy link

SparkQA commented Mar 13, 2019

Test build #103395 has finished for PR 24073 at commit d59b0f7.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 13, 2019

Test build #103409 has finished for PR 24073 at commit d59b0f7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

retest this please

@maropu
Copy link
Member

maropu commented Mar 13, 2019

Can you put an example query to reproduce this bug in the PR description?

@kiszk
Copy link
Member

kiszk commented Mar 13, 2019

cc @ueshin

@kiszk
Copy link
Member

kiszk commented Mar 13, 2019

Thanks, let me see later.

@dilipbiswal
Copy link
Contributor Author

@maropu Updated the PR description. Thank you.

}
}
}
new GenericArrayData(data.slice(0, pos))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like original implementation assumes the duplicate items are placed at the end of data? I think it not only affects array of array, but also other element type like BinaryType.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case fails at current master, but passes after this change.

val a8 = Literal.create(Seq(2, 1, 2, 3, 4, 4, 5).map(_.toString.getBytes), ArrayType(BinaryType))
checkEvaluation(new ArrayDistinct(a8), Seq(2, 1, 3, 4, 5).map(_.toString.getBytes))

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya

Looks like original implementation assumes the duplicate items are placed at the end of data?

Probably. The thing is, it does not rearrange the data in any way. So i don't know how we can just return the slice of the original array.

I think it not only affects array of array, but also other element type like BinaryType

Yeah.. I will add your test case or enhance the Array[Binary] test case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.
Actually I'm not sure how I could miss this.
Thanks!

@SparkQA
Copy link

SparkQA commented Mar 13, 2019

Test build #103419 has finished for PR 24073 at commit d59b0f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -3112,29 +3112,30 @@ case class ArrayDistinct(child: Expression)
(data: Array[AnyRef]) => new GenericArrayData(data.distinct.asInstanceOf[Array[Any]])
} else {
(data: Array[AnyRef]) => {
var foundNullElement = false
var pos = 0
val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be an ArrayBuffer[Array[AnyRef]]?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are returning a GenericArrayData(arrayBuffer). GenericArrayData takes a parameter of Array[Any]. So we are okay here, no ? I looked at existing implementation for ArrayUnion and ArrayExcept for reference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine, you can pass an Array of anything to it then (right? or is there some compile-time issue I'm not thinking of). It's not that it doesn't work but this code locally could be more precise. No big deal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Sorry.. a little confused. So we have a input which is a Array[AnyRef]. Now if we declare the temporary buffer a ArrayBuffer[Array[AnyRef]], how do we populate its content ?
Example :
Input1 : Array[Integer] => Seq(1, 2, , 1)
In this case our output is : ArrayBuffer[Int] = Array(1, 2)
Input2 : Array[Array[Integer]] => Seq(Seq(1, 2), Seq(3, 4), Seq(3,4))
In this case our output is : ArrayBuffer[Array[Int]] => Array(Array(1,2), Array(3,4))
Input3 : Array[Struct] => Seq(struct(...), struct(...))

Thanks for your help.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disregard this, I'm mistaken. The use case here was arrays of arrays but this code isn't handling only array elements. Can the type by AnyRef though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Thank you.. Sure i will change it to Anyref.

var j = 0;
while (!found && j < arrayBuffer.size) {
val va = arrayBuffer(j)
found = (va != null) && ordering.equiv(va, data(i))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than handle nulls separately, can you just check it here?

found = if (va == null) data(i) == null else ordering.equiv(...)

It also kind of looks like the ordering already handles nulls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Thanks. Actually in my understanding, ordering does not seem to handle nulls. Given that, the proposed condition does not work for case when va != null and data[i] is null. We get a null pointer exception. We can probably do something like :

 if (data(i) != null && va != null) {
     found = ordering.equiv(va, data(i))
  } else if (data(i) == null && va == null) {
     found = true
  }

Given this, i feel the existing code reads better and i think performs better if we have many null and non-null values in the array by keeping the null handling outside the inner loop. Also, i believe, for other collection operation functions we treat nulls separately. But i will change, if you feel otherwise. Please let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, on second look the ordering in ArrayType doesn't handle nulls, only nulls in the array. This is fine. You could save a lookup with...

if (data(i) == null) {
  found = va == null
} else if (va != null) {
  found = ordering.equiv(va, data(i))
}

but I don't feel strongly about it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal explained my intention in the code of null part. @srowen is code simple and easy for reading, but it may include # of iterations if we have already seen null.

pos = pos + 1
if (data(i) != null) {
found = false
var j = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove semicolon

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen will do.

var pos = 0
val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
var alreadyStoredNull = false
var found = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this var inside the if statement below?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen ok.

@SparkQA
Copy link

SparkQA commented Mar 14, 2019

Test build #103467 has started for PR 24073 at commit dfdd109.

@dilipbiswal
Copy link
Contributor Author

@HyukjinKwon Hello.. the test seems to be stuck in ApproxCountDistinctForIntervalsQuerySuite which seems unrelated to the change in this PR. Is there a way to stop/kill this run ?

@maropu
Copy link
Member

maropu commented Mar 14, 2019

Jenkins has weird behaivours now and it seems the tests stopped in #24028 (comment), too.

@maropu
Copy link
Member

maropu commented Mar 14, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Mar 14, 2019

Test build #103479 has finished for PR 24073 at commit dfdd109.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 14, 2019

Test build #103482 has finished for PR 24073 at commit dfdd109.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 15, 2019

Test build #103515 has finished for PR 24073 at commit 60eb195.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Mar 15, 2019

LGTM.
I rethought after #24073 (comment), I agree with @kiszk to skip traversing the arraybuffer after null found.
@srowen Could you take another look please?
Thanks!

This reverts commit 60eb195.
@dilipbiswal
Copy link
Contributor Author

@kiszk @ueshin @srowen Thanks a lot for reviewing. I have reverted the last commit where i had removed special casing null handling. It should be ok now. Thanks.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine. Is there a more general problem of this form? someone else mentioned it might not be specific to arrays. It's OK to consider that separately if so, just checking if there is another very similar fix to be made elsewhere

@kiszk
Copy link
Member

kiszk commented Mar 15, 2019

LGTM, thank you for fixing this mistake.

@ueshin
Copy link
Member

ueshin commented Mar 15, 2019

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Mar 15, 2019

Test build #103537 has finished for PR 24073 at commit 1bde41c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

srowen pushed a commit that referenced this pull request Mar 16, 2019
…th columns containing array of array

## What changes were proposed in this pull request?
Correct the logic to compute the distinct.

Below is a small repro snippet.

```
scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col")
df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>]

scala> val distinctDF = df.select(array_distinct(col("array_col")))
distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>]

scala> df.show(false)
+----------------------------------------+
|array_col                               |
+----------------------------------------+
|[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]|
+----------------------------------------+
```
Error
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [1, 2], [1, 2]] |
+-------------------------+
```
Expected result
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [3, 4], [4, 5]] |
+-------------------------+
```
## How was this patch tested?
Added an additional test.

Closes #24073 from dilipbiswal/SPARK-27134.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit aea9a57)
Signed-off-by: Sean Owen <[email protected]>
@srowen
Copy link
Member

srowen commented Mar 16, 2019

Merged to master/2.4. It didn't pick cleanly into 2.3 and I wasn't clear whether it affected 2.3

@srowen srowen closed this in aea9a57 Mar 16, 2019
@dilipbiswal
Copy link
Contributor Author

dilipbiswal commented Mar 16, 2019

Thanks a lot @viirya @srowen @ueshin @kiszk

@dilipbiswal
Copy link
Contributor Author

@srowen Actually this function was added in 2.4. So we should be good :-)

kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…th columns containing array of array

## What changes were proposed in this pull request?
Correct the logic to compute the distinct.

Below is a small repro snippet.

```
scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col")
df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>]

scala> val distinctDF = df.select(array_distinct(col("array_col")))
distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>]

scala> df.show(false)
+----------------------------------------+
|array_col                               |
+----------------------------------------+
|[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]|
+----------------------------------------+
```
Error
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [1, 2], [1, 2]] |
+-------------------------+
```
Expected result
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [3, 4], [4, 5]] |
+-------------------------+
```
## How was this patch tested?
Added an additional test.

Closes apache#24073 from dilipbiswal/SPARK-27134.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit aea9a57)
Signed-off-by: Sean Owen <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 25, 2019
…th columns containing array of array

## What changes were proposed in this pull request?
Correct the logic to compute the distinct.

Below is a small repro snippet.

```
scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col")
df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>]

scala> val distinctDF = df.select(array_distinct(col("array_col")))
distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>]

scala> df.show(false)
+----------------------------------------+
|array_col                               |
+----------------------------------------+
|[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]|
+----------------------------------------+
```
Error
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [1, 2], [1, 2]] |
+-------------------------+
```
Expected result
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [3, 4], [4, 5]] |
+-------------------------+
```
## How was this patch tested?
Added an additional test.

Closes apache#24073 from dilipbiswal/SPARK-27134.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit aea9a57)
Signed-off-by: Sean Owen <[email protected]>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…th columns containing array of array

## What changes were proposed in this pull request?
Correct the logic to compute the distinct.

Below is a small repro snippet.

```
scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col")
df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>]

scala> val distinctDF = df.select(array_distinct(col("array_col")))
distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>]

scala> df.show(false)
+----------------------------------------+
|array_col                               |
+----------------------------------------+
|[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]|
+----------------------------------------+
```
Error
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [1, 2], [1, 2]] |
+-------------------------+
```
Expected result
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [3, 4], [4, 5]] |
+-------------------------+
```
## How was this patch tested?
Added an additional test.

Closes apache#24073 from dilipbiswal/SPARK-27134.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit aea9a57)
Signed-off-by: Sean Owen <[email protected]>
MTelling pushed a commit to palantir/spark that referenced this pull request Feb 18, 2021
…th columns containing array of array

## What changes were proposed in this pull request?
Correct the logic to compute the distinct.

Below is a small repro snippet.

```
scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col")
df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>]

scala> val distinctDF = df.select(array_distinct(col("array_col")))
distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>]

scala> df.show(false)
+----------------------------------------+
|array_col                               |
+----------------------------------------+
|[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]|
+----------------------------------------+
```
Error
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [1, 2], [1, 2]] |
+-------------------------+
```
Expected result
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [3, 4], [4, 5]] |
+-------------------------+
```
## How was this patch tested?
Added an additional test.

Closes apache#24073 from dilipbiswal/SPARK-27134.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit aea9a57)
Signed-off-by: Sean Owen <[email protected]>
rshkv pushed a commit to palantir/spark that referenced this pull request Feb 18, 2021
…th columns containing array of array

## What changes were proposed in this pull request?
Correct the logic to compute the distinct.

Below is a small repro snippet.

```
scala> val df = Seq(Seq(Seq(1, 2), Seq(1, 2), Seq(1, 2), Seq(3, 4), Seq(4, 5))).toDF("array_col")
df: org.apache.spark.sql.DataFrame = [array_col: array<array<int>>]

scala> val distinctDF = df.select(array_distinct(col("array_col")))
distinctDF: org.apache.spark.sql.DataFrame = [array_distinct(array_col): array<array<int>>]

scala> df.show(false)
+----------------------------------------+
|array_col                               |
+----------------------------------------+
|[[1, 2], [1, 2], [1, 2], [3, 4], [4, 5]]|
+----------------------------------------+
```
Error
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [1, 2], [1, 2]] |
+-------------------------+
```
Expected result
```
scala> distinctDF.show(false)
+-------------------------+
|array_distinct(array_col)|
+-------------------------+
|[[1, 2], [3, 4], [4, 5]] |
+-------------------------+
```
## How was this patch tested?
Added an additional test.

Closes apache#24073 from dilipbiswal/SPARK-27134.

Authored-by: Dilip Biswal <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit aea9a57)
Signed-off-by: Sean Owen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants