From 0e7c4c5f1eec0bdb6a83bdc89ba30e22a95cc158 Mon Sep 17 00:00:00 2001 From: Xavier Lange Date: Sat, 28 Aug 2021 07:17:39 -0400 Subject: [PATCH] Parquet Derive: remove obscure feature flags, make chrono time emit converted type (#712) * remove feature flags, make timestamp emit converted types * remove tracking numbers * NaiveDateTime emits converted type * formatting * formatting --- parquet_derive/Cargo.toml | 5 -- parquet_derive/src/parquet_field.rs | 79 +++++++++++++++++++++++------ parquet_derive_test/Cargo.toml | 1 + parquet_derive_test/src/lib.rs | 6 ++- 4 files changed, 70 insertions(+), 21 deletions(-) diff --git a/parquet_derive/Cargo.toml b/parquet_derive/Cargo.toml index c374b3304ef3..a3af8e4dea52 100644 --- a/parquet_derive/Cargo.toml +++ b/parquet_derive/Cargo.toml @@ -30,11 +30,6 @@ edition = "2018" [lib] proc-macro = true -[features] -chrono = [] -bigdecimal = [] -uuid = [] - [dependencies] proc-macro2 = "1.0" quote = "1.0" diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index 6f2fa0c7d4cb..36730c7713c5 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -207,14 +207,25 @@ impl Field { }; let logical_type = self.ty.logical_type(); let repetition = self.ty.repetition(); - quote! { - fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type) - .with_logical_type(#logical_type) - .with_repetition(#repetition) - .build() - .unwrap() - .into() - ); + let converted_type = self.ty.converted_type(); + + if let Some(converted_type) = converted_type { + quote! { + fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type) + .with_logical_type(#logical_type) + .with_repetition(#repetition) + .with_converted_type(#converted_type) + .build().unwrap().into() + ) + } + } else { + quote! { + fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type) + .with_logical_type(#logical_type) + .with_repetition(#repetition) + .build().unwrap().into() + ) + } } } @@ -534,6 +545,7 @@ impl Type { })) } } "NaiveDate" => quote! { Some(LogicalType::DATE(Default::default())) }, + "NaiveDateTime" => quote! { None }, "f32" | "f64" => quote! { None }, "String" | "str" => quote! { Some(LogicalType::STRING(Default::default())) }, "Uuid" => quote! { Some(LogicalType::UUID(Default::default())) }, @@ -541,6 +553,15 @@ impl Type { } } + fn converted_type(&self) -> Option { + let last_part = self.last_part(); + + match last_part.trim() { + "NaiveDateTime" => Some(quote! { ConvertedType::TIMESTAMP_MILLIS }), + _ => None, + } + } + fn repetition(&self) -> proc_macro2::TokenStream { match &self { Type::Option(_) => quote! { Repetition::OPTIONAL }, @@ -944,7 +965,6 @@ mod test { } #[test] - #[cfg(feature = "chrono")] fn test_chrono_timestamp_millis() { let snippet: proc_macro2::TokenStream = quote! { struct ATimestampStruct { @@ -971,7 +991,11 @@ mod test { { let definition_levels : Vec = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect(); let vals : Vec<_> = records.iter().filter_map(|rec| { - rec.maybe_happened.map(|inner| { inner.timestamp_millis() }) + if let Some(inner) = rec.maybe_happened { + Some(inner.timestamp_millis()) + } else { + None + } }).collect(); if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer { @@ -984,7 +1008,6 @@ mod test { } #[test] - #[cfg(feature = "chrono")] fn test_chrono_date() { let snippet: proc_macro2::TokenStream = quote! { struct ATimestampStruct { @@ -1011,7 +1034,11 @@ mod test { { let definition_levels : Vec = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect(); let vals : Vec<_> = records.iter().filter_map(|rec| { - rec.maybe_happened.map(|inner| { inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }) + if let Some(inner) = rec.maybe_happened { + Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) + } else { + None + } }).collect(); if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer { @@ -1024,10 +1051,9 @@ mod test { } #[test] - #[cfg(feature = "uuid")] fn test_uuid() { let snippet: proc_macro2::TokenStream = quote! { - struct ATimestampStruct { + struct AUuidStruct { unique_id: uuid::Uuid, maybe_unique_id: Option<&uuid::Uuid>, } @@ -1051,7 +1077,11 @@ mod test { { let definition_levels : Vec = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect(); let vals : Vec<_> = records.iter().filter_map(|rec| { - rec.maybe_unique_id.map(|ref inner| { (&inner.to_string()[..]).into() }) + if let Some(ref inner) = rec.maybe_unique_id { + Some((&inner.to_string()[..]).into()) + } else { + None + } }).collect(); if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer { @@ -1062,4 +1092,23 @@ mod test { } }).to_string()); } + + #[test] + fn test_converted_type() { + let snippet: proc_macro2::TokenStream = quote! { + struct ATimeStruct { + time: chrono::NaiveDateTime, + } + }; + + let fields = extract_fields(snippet); + + let time = Field::from(&fields[0]); + + let converted_type = time.ty.converted_type(); + assert_eq!( + converted_type.unwrap().to_string(), + quote! { ConvertedType::TIMESTAMP_MILLIS }.to_string() + ); + } } diff --git a/parquet_derive_test/Cargo.toml b/parquet_derive_test/Cargo.toml index 2d6881066ec3..97d2dde309b6 100644 --- a/parquet_derive_test/Cargo.toml +++ b/parquet_derive_test/Cargo.toml @@ -30,3 +30,4 @@ publish = false [dependencies] parquet = { path = "../parquet", version = "6.0.0-SNAPSHOT" } parquet_derive = { path = "../parquet_derive", version = "6.0.0-SNAPSHOT" } +chrono = "0.4.19" \ No newline at end of file diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index bc8e9147d154..2b7c060bbd5f 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -46,6 +46,7 @@ struct ACompleteRecord<'a> { pub maybe_double: Option, pub borrowed_maybe_a_string: &'a Option, pub borrowed_maybe_a_str: &'a Option<&'a str>, + pub now: chrono::NaiveDateTime, } #[cfg(test)] @@ -88,8 +89,11 @@ mod tests { OPTIONAL DOUBLE maybe_double; OPTIONAL BINARY borrowed_maybe_a_string (STRING); OPTIONAL BINARY borrowed_maybe_a_str (STRING); + REQUIRED INT64 now (TIMESTAMP_MILLIS); }"; + let schema = Arc::new(parse_message_type(schema_str).unwrap()); + let a_str = "hello mother".to_owned(); let a_borrowed_string = "cool news".to_owned(); let maybe_a_string = Some("it's true, I'm a string".to_owned()); @@ -116,9 +120,9 @@ mod tests { maybe_double: Some(std::f64::MAX), borrowed_maybe_a_string: &maybe_a_string, borrowed_maybe_a_str: &maybe_a_str, + now: chrono::Utc::now().naive_local(), }]; - let schema = Arc::new(parse_message_type(schema_str).unwrap()); let generated_schema = drs.as_slice().schema().unwrap(); assert_eq!(&schema, &generated_schema);