Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement array_union function #6981

Closed
izveigor opened this issue Jul 16, 2023 · 12 comments · Fixed by #7897
Closed

Implement array_union function #6981

izveigor opened this issue Jul 16, 2023 · 12 comments · Fixed by #7897
Labels
enhancement New feature or request

Comments

@izveigor
Copy link
Contributor

izveigor commented Jul 16, 2023

Is your feature request related to a problem or challenge?

Summary

Characteristic Description
Function name: array_union
Aliases: list_union
Original function?: No
Function Description: Azure DataBricks: Returns an array of the elements in the union of array1 and array2 without duplicates.
Spark SQL: Returns an array of elements that are present in both arrays (all elements from both arrays) with out duplicates.
Sources: Concept Azure Spark

Examples:

select array_union([1, 2, 3, 4], [5, 6, 3, 4]);
----
[1, 2, 3, 4, 5, 6]
select array_union([1, 2, 3, 4], [5, 6, 7, 8]);
----
[1, 2, 3, 4, 5, 6, 7, 8]

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@izveigor izveigor added the enhancement New feature or request label Jul 16, 2023
@Weijun-H
Copy link
Member

The example is not correct. It should be

select array_union([1, 2, 3, 4], [5, 6, 3, 4]);
----
[1, 2, 3, 4, 5, 6]
select array_union([1, 2, 3, 4], [5, 6, 7, 8]);
----
[1, 2, 3, 4, 5, 6, 7, 8]

@izveigor
Copy link
Contributor Author

Thank you, @Weijun-H! Corrected now.

@Weijun-H
Copy link
Member

Could I pick this ticket?

@izveigor
Copy link
Contributor Author

Could I pick this ticket?

@Weijun-H Yes, you can implement. Thanks for your initiative!

@edmondop
Copy link
Contributor

I started doing some work on this @izveigor but I am stucked in handling the non trivial case:

main...edmondop:arrow-datafusion:issue-6981?#diff-48cc9cf1bf

I have the following newbie questions:

  • Which types I need to match against ? (data_type, data_type) ?
  • Should I implement a deduplication post a concat_internal or should I find a way to rewrite concat_internal that minimize allocations and keep a set to avoid adding element twice?
  • What when the type is not DataType::List ?

@jayzhan211
Copy link
Contributor

jayzhan211 commented Oct 5, 2023

Which types I need to match against ? (data_type, data_type) ?
What when the type is not DataType::List ?

I think union with Null is not defined, so you could just match the List type, i.e. (List,List).

Should I implement a deduplication post a concat_internal or should I find a way to rewrite concat_internal that minimize allocations and keep a set to avoid adding element twice?

It sounds like the latter is better, but if you fail to find the way, the first approach is also fine.

@edmondop
Copy link
Contributor

@jayzhan211 I looked deeper in the code, it seems that:

  • performing deduplication after would require to pattern match the internal type of the array
  • performing deduplication upon creation would require modifying the MutableArrayData

The latter is here: https://github.com/apache/arrow-rs/blob/03d0505fc864c09e6dcd208d3cdddeecefb90345/arrow-select/src/concat.rs#L111 and would require a separate release of arrow-rs to extend concatenation to use an HashSet internally. On the other side, in the current arrow-datafusion, I can't find any sign of deduplication.

I created a draft PR here https://github.com/apache/arrow-datafusion/pull/7897/files but I am stuck at the moment

@jayzhan211
Copy link
Contributor

jayzhan211 commented Oct 22, 2023

@edmondop

performing deduplication after would require to pattern match the internal type of the array

We may not need pattern matching for Internal type of array. Type coercion should had been done in https://github.com/apache/arrow-datafusion/blob/9fde5c4282fd9f0e3332fb40998bf1562c17fcda/datafusion/optimizer/src/analyzer/type_coercion.rs#L582-L601
Therefore after concatenation for each arrays, they have the same data type and just add them to HashSet and construct back from it.

performing deduplication upon creation would require modifying the MutableArrayData

I think we can maintain HashSet for each row, convert array to primitives e.g.Vec<i32> or scalars e.g.Vec<ScalarValue>, extend all the values on the same row to the same HashSet then construct the final array back from the HashSets. No need concat_internal or arrow::compute::concat.

would require a separate release of arrow-rs to extend concatenation to use an HashSet internally

I'm not sure how can we benefit from having HashSet inside arrow-rs. It seems to me easier to have that in DF

@edmondop
Copy link
Contributor

Initial implementation of array union without deduplication #7897

Thanks a lot for your help. So you recommend here to avoid concat_internal. I was looking at array_concat as a starting point for an implementaiton, but it looks like I might be wrong?

@jayzhan211
Copy link
Contributor

jayzhan211 commented Oct 22, 2023

I would prefer performing deduplication after , since this seems much more straightforward to me.

  1. Concat N arrays with array_concat or concat_internal
  2. Extends array into HashSet per row then build them back from final HashSet

But you can also try performing dedup upon without concat, if this is more efficient or readable, we can go with it.

@edmondop
Copy link
Contributor

I would prefer performing deduplication after , since this seems much more straightforward to me.

  1. Concat N arrays with array_concat or concat_internal
  2. Extends array into HashSet per row then build them back from final HashSet

But you can also try performing dedup upon without concat, if this is more efficient or readable, we can go with it.

Hello @jayzhan211, can you help me move my next steps from https://github.com/apache/arrow-datafusion/pull/7897/files#r1369465094 ? I am still learning the internal API so the answer might be obvious :)

@tustvold
Copy link
Contributor

tustvold commented Nov 2, 2023

In response to a question asked in the ASF slack, here is my best attempt to explain what is going on here


Scalars vs Arrays

First in order to understand what is going on we need to understand the relationship between SQL and arrow, and in particular how this relates to scalars.

The arrow data model is concerned with arrays, these can be thought of as columns in a tabular data model.

So an Int64Array might contain the data [1, 2, 3, 4].

If you then wrote a SQL expression like

SQL> col_a + col_b

This will compute a new array col_c where col_c[i] = col_a[i] + col_b[i]

Now when writing SQL queries it is common to want to write something like

SQL> col_a + 3

In this case you want col_c[i] = col_a[i] + 3.

In this case 3 is a scalar value. Arrow only stores arrays not scalars and so what do you do?

DF uses an ColumnarValue enumeration which contains either an array or a ScalarValue

Upstream arrow-rs has a similar concept using a trait called Datum

Some kernels are able to optimise the case where one or other of the sides is known to be a scalar, and so explicitly handle these special types.

In the case of other kernels, DF will expand the scalar value to an array.

In particular consider the expression

SQL> operation(a, 3)

Say operation only handles arrays, and a contains [1, 2, 3, 4].

DF will call the underlying kernel with operation([1, 2, 3, 4], [3, 3, 3, 3])


ListArray Union

Now we can get back to talking about array_union

Consider the example SQL

SQL> array_union([1, 2, 3, 4], [5, 6, 7, 8])

Both arguments are scalars, and so DF will expand them to arrays as above, although only to a single element in this case because both arguments are actually scalar.

So the arguments to array_union kernel are actually 2-dimensional, i.e.

RUST> array_union([[1, 2, 3, 4]], [[5, 6, 7, 8]])

The kernel then wants to for each element of the input list arrays, in this case a list of integers, perform the union operation

let l = l.as_list::<OffsetSize>();
let r = r.as_list::<OffsetSize>();
let out = ...
for (l_value, r_value) in l.iter().zip(r) {
    out.push(union(l_value, r_value));
}

Now this is where it gets tricky, as not only do we need to deduplicate the list values, but also incrementally build an output list array with the results.

One way this might be achieved is something like (not tested at all)

let l = l.as_list::<OffsetSize>();
let r = r.as_list::<OffsetSize>();

let converter = RowConverter::new(...);
let mut dedup = HashSet::new();

let l_values = converter.convert_values(l.values()).unwrap();
let r_values = converter.convert_values(r.values()).unwrap();

// Might be worth adding an upstream OffsetBufferBuilder
let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
offset.push(0.usize_as());
let mut rows = Vec::with_capacity(l_values.len() + r_values.len());

for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
    let l_slice = l_w[0]..l_w[1]; // The slice of `l_values` comprising the current list element
    let r_slice = r_w[0]..r_w[1]; // The slice of `r_values` comprising the current list element
    
    l_slice.for_each(|i| dedup.insert(l_values.row(i)));
    r_slice.for_each(|i| dedup.insert(r_values.row(i)));
    
    rows.extend(dedup.iter());
    offsets.append(rows.len().usize_as())
    dedup.clear();
}

let values = converter.convert_rows(rows).unwrap();
let offsets = OffsetBuffer::new(offsets.into());
let nulls = NullBuffer::union(l.nulls(), r.nulls());

Ok(GenericListArray::<OffsetSize>::new(offsets, values, nulls))

This relies on the particulars of how ListArray is represented, which is explained in a bit more detail here.

Unfortunately the operation being performed here is not a natural fit for a selection kernel, because it is selecting particular groups of values from list children, so unfortunately there isn't really a way to avoid dealing with the reality of how these arrays are encoded.

Dealing with the offsets and values directly avoids fighting the borrow checker for the lifetime of rows and avoids downcasting for each list element. Additionally using RowConverter means this will generalise to lists of any value type, including lists or structs, whilst avoiding having to generate specialized code for each value type, which is a problematic patttern (##7988).

I hope this at least clarifies some things...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants