-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Validate ScalarUDF output rows and fix nulls for array_has
and get_field
for Map
#10148
Changes from all commits
c94cac8
5d67c73
96f4fe2
5587b03
bba9bfe
9be1a5a
2a93b8c
9743382
7eebcf2
c972d7d
e5bbfaf
ed41d3a
6603135
cc53bd3
cf7fac3
fc304ae
e70245e
cda3e3b
da77fb2
efb1c5f
6c397e8
c1458c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -288,36 +288,40 @@ fn general_array_has_dispatch<O: OffsetSizeTrait>( | |
} else { | ||
array | ||
}; | ||
|
||
for (row_idx, (arr, sub_arr)) in array.iter().zip(sub_array.iter()).enumerate() { | ||
if let (Some(arr), Some(sub_arr)) = (arr, sub_arr) { | ||
let arr_values = converter.convert_columns(&[arr])?; | ||
let sub_arr_values = if comparison_type != ComparisonType::Single { | ||
converter.convert_columns(&[sub_arr])? | ||
} else { | ||
converter.convert_columns(&[element.clone()])? | ||
}; | ||
|
||
let mut res = match comparison_type { | ||
ComparisonType::All => sub_arr_values | ||
.iter() | ||
.dedup() | ||
.all(|elem| arr_values.iter().dedup().any(|x| x == elem)), | ||
ComparisonType::Any => sub_arr_values | ||
.iter() | ||
.dedup() | ||
.any(|elem| arr_values.iter().dedup().any(|x| x == elem)), | ||
ComparisonType::Single => arr_values | ||
.iter() | ||
.dedup() | ||
.any(|x| x == sub_arr_values.row(row_idx)), | ||
}; | ||
|
||
if comparison_type == ComparisonType::Any { | ||
res |= res; | ||
match (arr, sub_arr) { | ||
(Some(arr), Some(sub_arr)) => { | ||
let arr_values = converter.convert_columns(&[arr])?; | ||
let sub_arr_values = if comparison_type != ComparisonType::Single { | ||
converter.convert_columns(&[sub_arr])? | ||
} else { | ||
converter.convert_columns(&[element.clone()])? | ||
}; | ||
|
||
let mut res = match comparison_type { | ||
ComparisonType::All => sub_arr_values | ||
.iter() | ||
.dedup() | ||
.all(|elem| arr_values.iter().dedup().any(|x| x == elem)), | ||
ComparisonType::Any => sub_arr_values | ||
.iter() | ||
.dedup() | ||
.any(|elem| arr_values.iter().dedup().any(|x| x == elem)), | ||
ComparisonType::Single => arr_values | ||
.iter() | ||
.dedup() | ||
.any(|x| x == sub_arr_values.row(row_idx)), | ||
}; | ||
|
||
if comparison_type == ComparisonType::Any { | ||
res |= res; | ||
} | ||
boolean_builder.append_value(res); | ||
} | ||
// respect null input | ||
(_, _) => { | ||
boolean_builder.append_null(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
} | ||
|
||
boolean_builder.append_value(res); | ||
} | ||
} | ||
Ok(Arc::new(boolean_builder.finish())) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,9 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use arrow::array::{Scalar, StringArray}; | ||
use arrow::array::{ | ||
make_array, Array, Capacities, MutableArrayData, Scalar, StringArray, | ||
}; | ||
use arrow::datatypes::DataType; | ||
use datafusion_common::cast::{as_map_array, as_struct_array}; | ||
use datafusion_common::{exec_err, ExprSchema, Result, ScalarValue}; | ||
|
@@ -107,29 +109,55 @@ impl ScalarUDFImpl for GetFieldFunc { | |
); | ||
} | ||
}; | ||
|
||
match (array.data_type(), name) { | ||
(DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => { | ||
let map_array = as_map_array(array.as_ref())?; | ||
let key_scalar = Scalar::new(StringArray::from(vec![k.clone()])); | ||
let keys = arrow::compute::kernels::cmp::eq(&key_scalar, map_array.keys())?; | ||
let entries = arrow::compute::filter(map_array.entries(), &keys)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. using filter will reduce the number of input rows to the number of rows that have keys matching the input key. But we want to respect the number of input rows, and give null for any rows not having the matching key There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand this If the input is like this (two rows, each three elements)
An expression like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previous implememtation map_array.entries() has type of
With the example above, the layout of field "fields" will be a vector of 2 array, where first array is a list of key, and second array is a list of value
with this computation, the result is a boolean aray where "key" = "c"
and thus this operation will reduce the number of rows into
Problem However, let's add a row where the map does not have key "c" in between
map_array.entries() underneath is represented as
and the return result will be
instead of
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect the result of evaluating
to be:
For example, in duckdb: D create table foo as values (MAP {'a':1, 'b':2, 'c':100}), (MAP{ 'a':1, 'b':2}), (MAP {'a':1, 'b':2, 'c':200});
D select * from foo;
┌───────────────────────┐
│ col0 │
│ map(varchar, integer) │
├───────────────────────┤
│ {a=1, b=2, c=100} │
│ {a=1, b=2} │
│ {a=1, b=2, c=200} │
└───────────────────────┘
D select col0['c'] from foo;
┌───────────┐
│ col0['c'] │
│ int32[] │
├───────────┤
│ [100] │
│ [] │
│ [200] │
└───────────┘ Basically a scalar function has the invarant that each input row produces exactly 1 output row There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, the previous implementation will not return null (according to slt https://github.com/apache/datafusion/pull/10148/files#diff-4db02ec06ada20062cbb518eeb713c2643f5a51e89774bdadd9966410baa7d1dR47) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i also explained in this discussion: #10148 (comment) |
||
let entries_struct_array = as_struct_array(entries.as_ref())?; | ||
Ok(ColumnarValue::Array(entries_struct_array.column(1).clone())) | ||
} | ||
(DataType::Struct(_), ScalarValue::Utf8(Some(k))) => { | ||
let as_struct_array = as_struct_array(&array)?; | ||
match as_struct_array.column_by_name(k) { | ||
None => exec_err!( | ||
"get indexed field {k} not found in struct"), | ||
Some(col) => Ok(ColumnarValue::Array(col.clone())) | ||
(DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => { | ||
let map_array = as_map_array(array.as_ref())?; | ||
let key_scalar: Scalar<arrow::array::GenericByteArray<arrow::datatypes::GenericStringType<i32>>> = Scalar::new(StringArray::from(vec![k.clone()])); | ||
let keys = arrow::compute::kernels::cmp::eq(&key_scalar, map_array.keys())?; | ||
|
||
// note that this array has more entries than the expected output/input size | ||
// because maparray is flatten | ||
let original_data = map_array.entries().column(1).to_data(); | ||
let capacity = Capacities::Array(original_data.len()); | ||
let mut mutable = | ||
MutableArrayData::with_capacities(vec![&original_data], true, | ||
capacity); | ||
|
||
for entry in 0..map_array.len(){ | ||
let start = map_array.value_offsets()[entry] as usize; | ||
let end = map_array.value_offsets()[entry + 1] as usize; | ||
|
||
let maybe_matched = | ||
keys.slice(start, end-start). | ||
iter().enumerate(). | ||
find(|(_, t)| t.unwrap()); | ||
if maybe_matched.is_none(){ | ||
mutable.extend_nulls(1); | ||
continue | ||
} | ||
let (match_offset,_) = maybe_matched.unwrap(); | ||
mutable.extend(0, start + match_offset, start + match_offset + 1); | ||
} | ||
let data = mutable.freeze(); | ||
let data = make_array(data); | ||
Ok(ColumnarValue::Array(data)) | ||
} | ||
(DataType::Struct(_), ScalarValue::Utf8(Some(k))) => { | ||
let as_struct_array = as_struct_array(&array)?; | ||
match as_struct_array.column_by_name(k) { | ||
None => exec_err!("get indexed field {k} not found in struct"), | ||
Some(col) => Ok(ColumnarValue::Array(col.clone())), | ||
} | ||
(DataType::Struct(_), name) => exec_err!( | ||
"get indexed field is only possible on struct with utf8 indexes. \ | ||
Tried with {name:?} index"), | ||
(dt, name) => exec_err!( | ||
"get indexed field is only possible on lists with int64 indexes or struct \ | ||
with utf8 indexes. Tried {dt:?} with {name:?} index"), | ||
} | ||
(DataType::Struct(_), name) => exec_err!( | ||
"get indexed field is only possible on struct with utf8 indexes. \ | ||
Tried with {name:?} index" | ||
), | ||
(dt, name) => exec_err!( | ||
"get indexed field is only possible on lists with int64 indexes or struct \ | ||
with utf8 indexes. Tried {dt:?} with {name:?} index" | ||
), | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5169,8 +5169,9 @@ false false false true | |||||||||||||||||||||||
true false true false | ||||||||||||||||||||||||
true false false true | ||||||||||||||||||||||||
false true false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test result does not look correct, because it ignore some null rows in between |
||||||||||||||||||||||||
NULL NULL false false | ||||||||||||||||||||||||
false false NULL false | ||||||||||||||||||||||||
false false false NULL | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
query BBBB | ||||||||||||||||||||||||
select array_has(arrow_cast(column1, 'LargeList(List(Int64))'), make_array(5, 6)), | ||||||||||||||||||||||||
|
@@ -5183,8 +5184,9 @@ false false false true | |||||||||||||||||||||||
true false true false | ||||||||||||||||||||||||
true false false true | ||||||||||||||||||||||||
false true false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
NULL NULL false false | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I double checked and the datafusion/datafusion/sqllogictest/test_files/array.slt Lines 58 to 68 in cc53bd3
|
||||||||||||||||||||||||
false false NULL false | ||||||||||||||||||||||||
false false false NULL | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
query BBBB | ||||||||||||||||||||||||
select array_has(column1, make_array(5, 6)), | ||||||||||||||||||||||||
|
@@ -5197,8 +5199,9 @@ false false false true | |||||||||||||||||||||||
true false true false | ||||||||||||||||||||||||
true false false true | ||||||||||||||||||||||||
false true false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
false false false false | ||||||||||||||||||||||||
NULL NULL false false | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Iikewise I agree this should have 7 output rows datafusion/datafusion/sqllogictest/test_files/array.slt Lines 80 to 90 in cc53bd3
|
||||||||||||||||||||||||
false false NULL false | ||||||||||||||||||||||||
false false false NULL | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
query BBBBBBBBBBBBB | ||||||||||||||||||||||||
select array_has_all(make_array(1,2,3), make_array(1,3)), | ||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ DELETE 24 | |
query T | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to remind myself what this data looked like. Here it is for anyone else who may be interested DataFusion CLI v37.1.0
> select * from 'datafusion/core/tests/data/parquet_map.parquet';
+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
| ints | strings | timestamp |
+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
| {bytes: 38906} | {host: 198.194.132.41, method: GET, protocol: HTTP/1.0, referer: https://some.com/this/endpoint/prints/money, request: /observability/metrics/production, status: 400, user-identifier: shaneIxD} | 06/Oct/2023:17:53:45 |
| {bytes: 44606} | {host: 140.115.224.194, method: PATCH, protocol: HTTP/1.0, referer: https://we.org/user/booperbot124, request: /booper/bopper/mooper/mopper, status: 304, user-identifier: jesseddy} | 06/Oct/2023:17:53:45 |
| {bytes: 23517} | {host: 63.69.43.67, method: GET, protocol: HTTP/2.0, referer: https://random.net/booper/bopper/mooper/mopper, request: /booper/bopper/mooper/mopper, status: 550, user-identifier: jesseddy} | 06/Oct/2023:17:53:45 |
| {bytes: 44876} | {host: 69.4.253.156, method: PATCH, protocol: HTTP/1.1, referer: https://some.net/booper/bopper/mooper/mopper, request: /user/booperbot124, status: 403, user-identifier: Karimmove} | 06/Oct/2023:17:53:45 |
| {bytes: 34122} | {host: 239.152.196.123, method: DELETE, protocol: HTTP/2.0, referer: https://for.com/observability/metrics/production, request: /apps/deploy, status: 403, user-identifier: meln1ks} | 06/Oct/2023:17:53:45 |
| {bytes: 37438} | {host: 95.243.186.123, method: DELETE, protocol: HTTP/1.1, referer: https://make.de/wp-admin, request: /wp-admin, status: 550, user-identifier: Karimmove} | 06/Oct/2023:17:53:45 |
| {bytes: 45784} | {host: 66.142.251.66, method: PUT, protocol: HTTP/2.0, referer: https://some.org/apps/deploy, request: /secret-info/open-sesame, status: 403, user-identifier: benefritz} | 06/Oct/2023:17:53:45 |
| {bytes: 27788} | {host: 157.85.140.215, method: GET, protocol: HTTP/1.1, referer: https://random.de/booper/bopper/mooper/mopper, request: /booper/bopper/mooper/mopper, status: 401, user-identifier: devankoshal} | 06/Oct/2023:17:53:45 |
| {bytes: 5344} | {host: 62.191.179.3, method: POST, protocol: HTTP/1.0, referer: https://random.org/booper/bopper/mooper/mopper, request: /observability/metrics/production, status: 400, user-identifier: jesseddy} | 06/Oct/2023:17:53:45 |
| {bytes: 9136} | {host: 237.213.221.20, method: PUT, protocol: HTTP/2.0, referer: https://some.us/this/endpoint/prints/money, request: /observability/metrics/production, status: 304, user-identifier: ahmadajmi} | 06/Oct/2023:17:53:46 |
| {bytes: 5640} | {host: 38.148.115.2, method: GET, protocol: HTTP/1.0, referer: https://for.net/apps/deploy, request: /do-not-access/needs-work, status: 301, user-identifier: benefritz} | 06/Oct/2023:17:53:46 |
... |
||
SELECT strings['not_found'] FROM data LIMIT 1; | ||
---- | ||
NULL | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not familiar with Map. Why should we return null here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will throws the invalidation error i added in this PR. I think the correct behavior is to return null for every input rows not having the associated key. I took a look at duckdb and spark and they also have this behavior
And also, a similar implementation in Datafusion is array_element also return null if the index goes out of range There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is how it works on main: > select strings['not_found'] from 'datafusion/core/tests/data/parquet_map.parquet';
0 row(s) fetched.
Elapsed 0.006 seconds.
Here is how it works on this PR (aka has a single row for each input row) DataFusion CLI v37.1.0
> select strings['not_found'] from '../datafusion/core/tests/data/parquet_map.parquet';
+----------------------------------------------------------------------+
| ../datafusion/core/tests/data/parquet_map.parquet.strings[not_found] |
+----------------------------------------------------------------------+
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| . |
| . |
| . |
+----------------------------------------------------------------------+
209 row(s) fetched. (First 40 displayed. Use --maxrows to adjust)
Elapsed 0.033 seconds. |
||
|
||
statement ok | ||
drop table data; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌 -- very nice