Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Partitioning Data by Dictionary Encoded String Array Types #7896

Merged
merged 7 commits into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,33 @@ impl DFSchema {
})
}

/// Returns true if the two schemas have the same qualified named
/// fields with logically equivalent data types. Returns false otherwise.
///
/// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
/// equivalence checking.
pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
if self.fields().len() != other.fields().len() {
return false;
}
let self_fields = self.fields().iter();
let other_fields = other.fields().iter();
self_fields.zip(other_fields).all(|(f1, f2)| {
f1.qualifier() == f2.qualifier()
&& f1.name() == f2.name()
&& Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
})
}

/// Returns true if the two schemas have the same qualified named
/// fields with the same data types. Returns false otherwise.
///
/// This is a specialized version of Eq that ignores differences
/// in nullability and metadata.
///
/// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
/// logical type checking, which for example would consider a dictionary
/// encoded UTF8 array to be equivalent to a plain UTF8 array.
pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
if self.fields().len() != other.fields().len() {
return false;
Expand All @@ -409,6 +431,46 @@ impl DFSchema {
})
}

/// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
/// than datatype_is_semantically_equal in that a Dictionary<K,V> type is logically
/// equal to a plain V type, but not semantically equal. Dictionary<K1, V1> is also
/// logically equal to Dictionary<K2, V1>.
fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
// check nested fields
match (dt1, dt2) {
(DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
v1.as_ref() == v2.as_ref()
}
(DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
(othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
(DataType::List(f1), DataType::List(f2))
| (DataType::LargeList(f1), DataType::LargeList(f2))
| (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _))
| (DataType::Map(f1, _), DataType::Map(f2, _)) => {
Self::field_is_logically_equal(f1, f2)
}
(DataType::Struct(fields1), DataType::Struct(fields2)) => {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
// all fields have to be the same
iter1
.zip(iter2)
.all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
}
(DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
let iter1 = fields1.iter();
let iter2 = fields2.iter();
fields1.len() == fields2.len() &&
// all fields have to be the same
iter1
.zip(iter2)
.all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
}
_ => dt1 == dt2,
}
}

/// Returns true of two [`DataType`]s are semantically equal (same
/// name and type), ignoring both metadata and nullability.
///
Expand Down Expand Up @@ -448,6 +510,11 @@ impl DFSchema {
}
}

fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
f1.name() == f2.name()
&& Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
}

fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
f1.name() == f2.name()
&& Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
Expand Down Expand Up @@ -778,6 +845,13 @@ pub trait SchemaExt {
///
/// It works the same as [`DFSchema::equivalent_names_and_types`].
fn equivalent_names_and_types(&self, other: &Self) -> bool;

/// Returns true if the two schemas have the same qualified named
/// fields with logically equivalent data types. Returns false otherwise.
///
/// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
/// equivalence checking.
fn logically_equivalent_names_and_types(&self, other: &Self) -> bool;
}

impl SchemaExt for Schema {
Expand All @@ -797,6 +871,23 @@ impl SchemaExt for Schema {
)
})
}

fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
if self.fields().len() != other.fields().len() {
return false;
}

self.fields()
.iter()
.zip(other.fields().iter())
.all(|(f1, f2)| {
f1.name() == f2.name()
&& DFSchema::datatype_is_logically_equal(
f1.data_type(),
f2.data_type(),
)
})
}
}

#[cfg(test)]
Expand Down
18 changes: 17 additions & 1 deletion datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::physical_plan::SendableRecordBatchStream;

use arrow_array::builder::UInt64Builder;
use arrow_array::cast::AsArray;
use arrow_array::{RecordBatch, StructArray};
use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArray};
use arrow_schema::{DataType, Schema};
use datafusion_common::cast::as_string_array;
use datafusion_common::DataFusionError;
Expand Down Expand Up @@ -311,6 +311,22 @@ fn compute_partition_keys_by_row<'a>(
partition_values.push(array.value(i));
}
}
DataType::Dictionary(_, _) => {
downcast_dictionary_array!(
col_array => {
let array = col_array.downcast_dict::<StringArray>()
.ok_or(DataFusionError::Execution(format!("it is not yet supported to write to hive partitions with datatype {}",
dtype)))?;

for val in array.values() {
partition_values.push(
val.ok_or(DataFusionError::Execution(format!("Cannot partition by null value for column {}", col)))?
);
}
},
_ => unreachable!(),
)
}
_ => {
return Err(DataFusionError::NotImplemented(format!(
"it is not yet supported to write to hive partitions with datatype {}",
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,10 @@ impl TableProvider for ListingTable {
overwrite: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check that the schema of the plan matches the schema of this table.
if !self.schema().equivalent_names_and_types(&input.schema()) {
if !self
.schema()
.logically_equivalent_names_and_types(&input.schema())
{
return plan_err!(
// Return an error if schema of the input query does not match with the table schema.
"Inserting query must have the same schema with the table."
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ impl TableProvider for MemTable {
) -> Result<Arc<dyn ExecutionPlan>> {
// Create a physical plan from the logical plan.
// Check that the schema of the plan matches the schema of this table.
if !self.schema().equivalent_names_and_types(&input.schema()) {
if !self
.schema()
.logically_equivalent_names_and_types(&input.schema())
{
return plan_err!(
"Inserting query must have the same schema with the table."
);
Expand Down
38 changes: 37 additions & 1 deletion datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,44 @@ STORED AS CSV
WITH HEADER ROW
LOCATION '../../testing/data/csv/aggregate_test_100.csv'

# test_insert_into

statement ok
create table dictionary_encoded_values as values
('a', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('b', arrow_cast('bar', 'Dictionary(Int32, Utf8)'));

query TTT
describe dictionary_encoded_values;
----
column1 Utf8 YES
column2 Dictionary(Int32, Utf8) YES

statement ok
CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned(
a varchar,
b varchar,
)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned'
PARTITIONED BY (b)
OPTIONS(
create_local_path 'true',
insert_mode 'append_new_files',
);

query TT
insert into dictionary_encoded_parquet_partitioned
select * from dictionary_encoded_values
----
2

query TT
select * from dictionary_encoded_parquet_partitioned order by (a);
----
a foo
b bar


# test_insert_into
statement ok
set datafusion.execution.target_partitions = 8;

Expand Down