diff --git a/crates/core/src/kernel/models/schema.rs b/crates/core/src/kernel/models/schema.rs index 87532d0b66..9330b3ce43 100644 --- a/crates/core/src/kernel/models/schema.rs +++ b/crates/core/src/kernel/models/schema.rs @@ -134,6 +134,8 @@ pub struct StructField { impl Hash for StructField { fn hash(&self, state: &mut H) { self.name.hash(state); + self.data_type.hash(state); + self.nullable.hash(state); } } @@ -215,7 +217,7 @@ impl StructField { /// A struct is used to represent both the top-level schema of the table /// as well as struct columns that contain nested columns. -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)] pub struct StructType { #[serde(rename = "type")] /// The type of this struct @@ -379,7 +381,7 @@ impl<'a> IntoIterator for &'a StructType { } } -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)] #[serde(rename_all = "camelCase")] /// An array stores a variable length collection of items of some type. pub struct ArrayType { @@ -415,7 +417,7 @@ impl ArrayType { } } -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)] #[serde(rename_all = "camelCase")] /// A map stores an arbitrary length collection of key-value pairs pub struct MapType { @@ -465,7 +467,7 @@ fn default_true() -> bool { true } -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)] #[serde(rename_all = "camelCase")] /// Primitive types supported by Delta pub enum PrimitiveType { @@ -559,7 +561,7 @@ impl Display for PrimitiveType { } } -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Eq, Hash)] #[serde(untagged, rename_all = "camelCase")] /// Top level delta tdatatypes pub enum DataType { @@ -641,6 +643,7 @@ mod tests { use super::*; use serde_json; use serde_json::json; + use std::hash::DefaultHasher; #[test] fn test_serde_data_types() { @@ -866,4 +869,65 @@ mod tests { let buf = r#"{"type":"struct","fields":[{"name":"ID_D_DATE","type":"long","nullable":true,"metadata":{"delta.identity.start":1,"delta.identity.step":1,"delta.identity.allowExplicitInsert":false}},{"name":"TXT_DateKey","type":"string","nullable":true,"metadata":{}}]}"#; let _schema: StructType = serde_json::from_str(buf).expect("Failed to load"); } + + fn get_hash(field: &StructField) -> u64 { + let mut hasher = DefaultHasher::new(); + field.hash(&mut hasher); + hasher.finish() + } + + #[test] + fn test_hash_struct_field() { + // different names should result in different hashes + let field_1 = StructField::new( + "field_name_1", + DataType::Primitive(PrimitiveType::Decimal(4, 4)), + true, + ); + let field_2 = StructField::new( + "field_name_2", + DataType::Primitive(PrimitiveType::Decimal(4, 4)), + true, + ); + assert_ne!(get_hash(&field_1), get_hash(&field_2)); + + // different types should result in different hashes + let field_int = StructField::new( + "field_name", + DataType::Primitive(PrimitiveType::Integer), + true, + ); + let field_string = StructField::new( + "field_name", + DataType::Primitive(PrimitiveType::String), + true, + ); + assert_ne!(get_hash(&field_int), get_hash(&field_string)); + + // different nullability should result in different hashes + let field_true = StructField::new( + "field_name", + DataType::Primitive(PrimitiveType::Binary), + true, + ); + let field_false = StructField::new( + "field_name", + DataType::Primitive(PrimitiveType::Binary), + false, + ); + assert_ne!(get_hash(&field_true), get_hash(&field_false)); + + // case where hashes are the same + let field_1 = StructField::new( + "field_name", + DataType::Primitive(PrimitiveType::Timestamp), + true, + ); + let field_2 = StructField::new( + "field_name", + DataType::Primitive(PrimitiveType::Timestamp), + true, + ); + assert_eq!(get_hash(&field_1), get_hash(&field_2)); + } } diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index 1ed4e1cee6..7f8e8309db 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -270,6 +270,13 @@ impl ConvertToDeltaBuilder { // Iterate over the parquet files. Parse partition columns, generate add actions and collect parquet file schemas let mut arrow_schemas = Vec::new(); let mut actions = Vec::new(); + // partition columns that were defined by caller and are expected to apply on this table + let mut expected_partitions: HashMap = self + .partition_schema + .clone() + .into_iter() + .map(|field| (field.name.clone(), field)) + .collect(); // A HashSet of all unique partition columns in a Parquet table let mut partition_columns = HashSet::new(); // A vector of StructField of all unique partition columns in a Parquet table @@ -290,7 +297,7 @@ impl ConvertToDeltaBuilder { .ok_or(Error::MissingPartitionSchema)?; if partition_columns.insert(key.to_string()) { - if let Some(schema) = self.partition_schema.take(key) { + if let Some(schema) = expected_partitions.remove(key) { partition_schema_fields.insert(key.to_string(), schema); } else { // Return an error if the schema of a partition column is not provided by user @@ -360,7 +367,7 @@ impl ConvertToDeltaBuilder { arrow_schemas.push(arrow_schema); } - if !self.partition_schema.is_empty() { + if !expected_partitions.is_empty() { // Partition column provided by the user does not exist in the parquet files return Err(Error::PartitionColumnNotExist(self.partition_schema)); }