Skip to content

Commit

Permalink
Parquet Derive: remove obscure feature flags, make chrono time emit c…
Browse files Browse the repository at this point in the history
…onverted type (#712)

* remove feature flags, make timestamp emit converted types

* remove tracking numbers

* NaiveDateTime emits converted type

* formatting

* formatting
  • Loading branch information
xrl authored Aug 28, 2021
1 parent 5a12d97 commit 0e7c4c5
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 21 deletions.
5 changes: 0 additions & 5 deletions parquet_derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ edition = "2018"
[lib]
proc-macro = true

[features]
chrono = []
bigdecimal = []
uuid = []

[dependencies]
proc-macro2 = "1.0"
quote = "1.0"
Expand Down
79 changes: 64 additions & 15 deletions parquet_derive/src/parquet_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,25 @@ impl Field {
};
let logical_type = self.ty.logical_type();
let repetition = self.ty.repetition();
quote! {
fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
.with_logical_type(#logical_type)
.with_repetition(#repetition)
.build()
.unwrap()
.into()
);
let converted_type = self.ty.converted_type();

if let Some(converted_type) = converted_type {
quote! {
fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
.with_logical_type(#logical_type)
.with_repetition(#repetition)
.with_converted_type(#converted_type)
.build().unwrap().into()
)
}
} else {
quote! {
fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
.with_logical_type(#logical_type)
.with_repetition(#repetition)
.build().unwrap().into()
)
}
}
}

Expand Down Expand Up @@ -534,13 +545,23 @@ impl Type {
})) }
}
"NaiveDate" => quote! { Some(LogicalType::DATE(Default::default())) },
"NaiveDateTime" => quote! { None },
"f32" | "f64" => quote! { None },
"String" | "str" => quote! { Some(LogicalType::STRING(Default::default())) },
"Uuid" => quote! { Some(LogicalType::UUID(Default::default())) },
f => unimplemented!("{} currently is not supported", f),
}
}

fn converted_type(&self) -> Option<proc_macro2::TokenStream> {
let last_part = self.last_part();

match last_part.trim() {
"NaiveDateTime" => Some(quote! { ConvertedType::TIMESTAMP_MILLIS }),
_ => None,
}
}

fn repetition(&self) -> proc_macro2::TokenStream {
match &self {
Type::Option(_) => quote! { Repetition::OPTIONAL },
Expand Down Expand Up @@ -944,7 +965,6 @@ mod test {
}

#[test]
#[cfg(feature = "chrono")]
fn test_chrono_timestamp_millis() {
let snippet: proc_macro2::TokenStream = quote! {
struct ATimestampStruct {
Expand All @@ -971,7 +991,11 @@ mod test {
{
let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
let vals : Vec<_> = records.iter().filter_map(|rec| {
rec.maybe_happened.map(|inner| { inner.timestamp_millis() })
if let Some(inner) = rec.maybe_happened {
Some(inner.timestamp_millis())
} else {
None
}
}).collect();

if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer {
Expand All @@ -984,7 +1008,6 @@ mod test {
}

#[test]
#[cfg(feature = "chrono")]
fn test_chrono_date() {
let snippet: proc_macro2::TokenStream = quote! {
struct ATimestampStruct {
Expand All @@ -1011,7 +1034,11 @@ mod test {
{
let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
let vals : Vec<_> = records.iter().filter_map(|rec| {
rec.maybe_happened.map(|inner| { inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 })
if let Some(inner) = rec.maybe_happened {
Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)
} else {
None
}
}).collect();

if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer {
Expand All @@ -1024,10 +1051,9 @@ mod test {
}

#[test]
#[cfg(feature = "uuid")]
fn test_uuid() {
let snippet: proc_macro2::TokenStream = quote! {
struct ATimestampStruct {
struct AUuidStruct {
unique_id: uuid::Uuid,
maybe_unique_id: Option<&uuid::Uuid>,
}
Expand All @@ -1051,7 +1077,11 @@ mod test {
{
let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect();
let vals : Vec<_> = records.iter().filter_map(|rec| {
rec.maybe_unique_id.map(|ref inner| { (&inner.to_string()[..]).into() })
if let Some(ref inner) = rec.maybe_unique_id {
Some((&inner.to_string()[..]).into())
} else {
None
}
}).collect();

if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
Expand All @@ -1062,4 +1092,23 @@ mod test {
}
}).to_string());
}

#[test]
fn test_converted_type() {
let snippet: proc_macro2::TokenStream = quote! {
struct ATimeStruct {
time: chrono::NaiveDateTime,
}
};

let fields = extract_fields(snippet);

let time = Field::from(&fields[0]);

let converted_type = time.ty.converted_type();
assert_eq!(
converted_type.unwrap().to_string(),
quote! { ConvertedType::TIMESTAMP_MILLIS }.to_string()
);
}
}
1 change: 1 addition & 0 deletions parquet_derive_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ publish = false
[dependencies]
parquet = { path = "../parquet", version = "6.0.0-SNAPSHOT" }
parquet_derive = { path = "../parquet_derive", version = "6.0.0-SNAPSHOT" }
chrono = "0.4.19"
6 changes: 5 additions & 1 deletion parquet_derive_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct ACompleteRecord<'a> {
pub maybe_double: Option<f64>,
pub borrowed_maybe_a_string: &'a Option<String>,
pub borrowed_maybe_a_str: &'a Option<&'a str>,
pub now: chrono::NaiveDateTime,
}

#[cfg(test)]
Expand Down Expand Up @@ -88,8 +89,11 @@ mod tests {
OPTIONAL DOUBLE maybe_double;
OPTIONAL BINARY borrowed_maybe_a_string (STRING);
OPTIONAL BINARY borrowed_maybe_a_str (STRING);
REQUIRED INT64 now (TIMESTAMP_MILLIS);
}";

let schema = Arc::new(parse_message_type(schema_str).unwrap());

let a_str = "hello mother".to_owned();
let a_borrowed_string = "cool news".to_owned();
let maybe_a_string = Some("it's true, I'm a string".to_owned());
Expand All @@ -116,9 +120,9 @@ mod tests {
maybe_double: Some(std::f64::MAX),
borrowed_maybe_a_string: &maybe_a_string,
borrowed_maybe_a_str: &maybe_a_str,
now: chrono::Utc::now().naive_local(),
}];

let schema = Arc::new(parse_message_type(schema_str).unwrap());
let generated_schema = drs.as_slice().schema().unwrap();

assert_eq!(&schema, &generated_schema);
Expand Down

0 comments on commit 0e7c4c5

Please sign in to comment.