-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Partitioning Data by Dictionary Encoded String Array Types #7896
Changes from 2 commits
1bda2e2
e6aafb4
878fd6a
7484e92
457ebea
21a8583
984b0dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,8 +28,9 @@ use crate::error::Result; | |
use crate::physical_plan::SendableRecordBatchStream; | ||
|
||
use arrow_array::builder::UInt64Builder; | ||
use arrow_array::cast::AsArray; | ||
use arrow_array::{RecordBatch, StructArray}; | ||
use arrow_array::cast::{as_dictionary_array, AsArray}; | ||
use arrow_array::types::{Int32Type, UInt16Type}; | ||
use arrow_array::{RecordBatch, StringArray, StructArray}; | ||
use arrow_schema::{DataType, Schema}; | ||
use datafusion_common::cast::as_string_array; | ||
use datafusion_common::DataFusionError; | ||
|
@@ -310,7 +311,37 @@ fn compute_partition_keys_by_row<'a>( | |
for i in 0..rb.num_rows() { | ||
partition_values.push(array.value(i)); | ||
} | ||
} | ||
}, | ||
DataType::Dictionary(key_type, _) => { | ||
match **key_type{ | ||
DataType::UInt16 => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to not have to enumerate every possible key_type here. It would be nice if there was a way to cast any There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey Devin, there is a You can see the code I used here: DataType::Dictionary(_, value_type)
if value_type.as_ref() == &DataType::Utf8 =>
{
downcast_dictionary_array!(
col_array => {
let array = col_array.downcast_dict::<StringArray>().unwrap();
for i in 0..rb.num_rows() {
partition_values.push(array.value(i));
}
},
_ => unreachable!(),
)
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @suremarc for the pointer to that macro! I updated this PR and it seems to work as expected! I also added a new test based on a dictionary encoded parquet file from one of the testing submodules. It is working without error, but the values look a bit off. Not sure at this point if related to the downcasting or something else. |
||
let dict_array = as_dictionary_array::<UInt16Type>(col_array); | ||
let array = dict_array.downcast_dict::<StringArray>() | ||
.ok_or(DataFusionError::NotImplemented(format!("It is not yet supported to write to hive partitioned with datatype {}", dtype)))?; | ||
for val in array.into_iter() { | ||
partition_values.push( | ||
val.ok_or(DataFusionError::Execution("Partition values cannot be null!".into()))? | ||
); | ||
} | ||
}, | ||
DataType::Int32 => { | ||
let dict_array = as_dictionary_array::<Int32Type>(col_array); | ||
let array = dict_array.downcast_dict::<StringArray>() | ||
.ok_or(DataFusionError::NotImplemented(format!("It is not yet supported to write to hive partitioned with datatype {}", dtype)))?; | ||
for val in array.into_iter() { | ||
partition_values.push( | ||
val.ok_or(DataFusionError::Execution("Partition values cannot be null!".into()))? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Traditionally null values are sent to https://cwiki.apache.org/confluence/display/Hive/Tutorial#Tutorial-Dynamic-PartitionInsert |
||
); | ||
} | ||
}, | ||
_ => { | ||
return Err(DataFusionError::NotImplemented(format!( | ||
"It is not yet supported to write to hive partitions with datatype {}", | ||
dtype | ||
))) | ||
} | ||
} | ||
}, | ||
_ => { | ||
return Err(DataFusionError::NotImplemented(format!( | ||
"it is not yet supported to write to hive partitions with datatype {}", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am somewhat worried about this change (as it effectively changes the meaning of the function to also include logical types).
I think we should either update the comments to reflect this change, or (my preference) make a new function that is explicit. Perhaps something like :
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your concern is justified as the optimizer also relies on this function and might have stricter equivalence requirements. I created a separate method for logical equivalence which allows for different dictionary encodings as long as values can ultimately be resolved to the same type.
The optimizer continues to use the original semantic equivalence method, and I've updated the insert_into methods to use the softer logical equivalence check.