Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Partitioning Data by Dictionary Encoded String Array Types #7896

Merged
merged 7 commits into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,15 @@ impl DFSchema {
Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
&& Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
}
// The next two cases allow for the possibility that one schema has a dictionary encoded array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am somewhat worried about this change (as it effectively changes the meaning of the function to also include logical types).

I think we should either update the comments to reflect this change, or (my preference) make a new function that is explicit. Perhaps something like :

   ///  Returns true of two [`DataType`]s have the same logical data type:
   /// Either the same name and type, or if one is dictionary encoded, then 
   /// the same name and type of the values.
   /// E.g. Dictionary(_, Utf8) is semantically equivalent to Utf8 since both represent an array of strings
  fn datatype_has_same_logical_type(
        match (dt1, dt2) {
            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
             _ => Self::datatype_is_semantically_equal(dt1, dt2)
        }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your concern is justified as the optimizer also relies on this function and might have stricter equivalence requirements. I created a separate method for logical equivalence which allows for different dictionary encodings as long as values can ultimately be resolved to the same type.

The optimizer continues to use the original semantic equivalence method, and I've updated the insert_into methods to use the softer logical equivalence check.

// and the other has an equivalent non dictionary encoded array of the same type
// E.g. Dictionary(_, Utf8) is semantically equivalent to Utf8 since both represent an array of strings
(DataType::Dictionary(_, v1), othertype) => {
v1.as_ref() == othertype
}
(othertype, DataType::Dictionary(_, v1)) => {
v1.as_ref() == othertype
}
(DataType::List(f1), DataType::List(f2))
| (DataType::LargeList(f1), DataType::LargeList(f2))
| (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _))
Expand Down
37 changes: 34 additions & 3 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;

use arrow_array::builder::UInt64Builder;
use arrow_array::cast::AsArray;
use arrow_array::{RecordBatch, StructArray};
use arrow_array::cast::{as_dictionary_array, AsArray};
use arrow_array::types::{Int32Type, UInt16Type};
use arrow_array::{RecordBatch, StringArray, StructArray};
use arrow_schema::{DataType, Schema};
use datafusion_common::cast::as_string_array;
use datafusion_common::DataFusionError;
Expand Down Expand Up @@ -310,7 +311,37 @@ fn compute_partition_keys_by_row<'a>(
for i in 0..rb.num_rows() {
partition_values.push(array.value(i));
}
}
},
DataType::Dictionary(key_type, _) => {
match **key_type{
DataType::UInt16 => {
Copy link
Contributor Author

@devinjdangelo devinjdangelo Oct 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to not have to enumerate every possible key_type here. It would be nice if there was a way to cast any Dictionary(_, V) -> V. Or at least iterate over any Dictionary(_, V) to pull out the V values in order without caring what type the key is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Devin, there is a downcast_dictionary_array! macro that I encountered while attempting to fix this issue. (I was going to submit a PR but was figuring out if/how to add tests!) It lets you write code once that works for all key types, if I am not mistaken. I saw it used elsewhere in DataFusion code: https://github.com/apache/arrow-datafusion/blob/9fde5c4282fd9f0e3332fb40998bf1562c17fcda/datafusion/common/src/hash_utils.rs#L326-L329

You can see the code I used here:
https://github.com/polygon-io/arrow-datafusion/blob/8c955891a6e5b6002faa07e54a67a6ec93347ade/datafusion/core/src/datasource/file_format/write/demux.rs#L316-L328

            DataType::Dictionary(_, value_type)
                if value_type.as_ref() == &DataType::Utf8 =>
            {
                downcast_dictionary_array!(
                    col_array =>  {
                        let array = col_array.downcast_dict::<StringArray>().unwrap();
                        for i in 0..rb.num_rows() {
                            partition_values.push(array.value(i));
                        }
                    },
                    _ => unreachable!(),
                )
            }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @suremarc for the pointer to that macro! I updated this PR and it seems to work as expected!

I also added a new test based on a dictionary encoded parquet file from one of the testing submodules. It is working without error, but the values look a bit off. Not sure at this point if related to the downcasting or something else.

let dict_array = as_dictionary_array::<UInt16Type>(col_array);
let array = dict_array.downcast_dict::<StringArray>()
.ok_or(DataFusionError::NotImplemented(format!("It is not yet supported to write to hive partitioned with datatype {}", dtype)))?;
for val in array.into_iter() {
partition_values.push(
val.ok_or(DataFusionError::Execution("Partition values cannot be null!".into()))?
);
}
},
DataType::Int32 => {
let dict_array = as_dictionary_array::<Int32Type>(col_array);
let array = dict_array.downcast_dict::<StringArray>()
.ok_or(DataFusionError::NotImplemented(format!("It is not yet supported to write to hive partitioned with datatype {}", dtype)))?;
for val in array.into_iter() {
partition_values.push(
val.ok_or(DataFusionError::Execution("Partition values cannot be null!".into()))?
Copy link
Contributor Author

@devinjdangelo devinjdangelo Oct 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Traditionally null values are sent to __HIVE_DEFAULT_PARTITION__ imo it is also a reasonable choice to throw an error instead. If the user desired the traditional behavior, they could do COALESCE(part_col, '__HIVE_DEFAULT_PARTITION__') as part_col

https://cwiki.apache.org/confluence/display/Hive/Tutorial#Tutorial-Dynamic-PartitionInsert

);
}
},
_ => {
return Err(DataFusionError::NotImplemented(format!(
"It is not yet supported to write to hive partitions with datatype {}",
dtype
)))
}
}
},
_ => {
return Err(DataFusionError::NotImplemented(format!(
"it is not yet supported to write to hive partitions with datatype {}",
Expand Down
Loading