From b26f680d649fb99349edd34b4dd7ec88791af42a Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Thu, 13 Jun 2024 07:38:37 -0400 Subject: [PATCH 1/3] refactor: fetch statistics for a given ParquetMetaData (#10880) * refactor: fetch statistics for a given ParquetMetaData * test: add tests for fetch_statistics_from_parquet_meta * Rename function and improve docs * Simplify the test --------- Co-authored-by: Andrew Lamb --- .../src/datasource/file_format/parquet.rs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 99c38d3f0980..572904254fd7 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -455,6 +455,8 @@ async fn fetch_schema( } /// Read and parse the statistics of the Parquet file at location `path` +/// +/// See [`statistics_from_parquet_meta`] for more details async fn fetch_statistics( store: &dyn ObjectStore, table_schema: SchemaRef, @@ -462,6 +464,17 @@ async fn fetch_statistics( metadata_size_hint: Option, ) -> Result { let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?; + statistics_from_parquet_meta(&metadata, table_schema).await +} + +/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] +/// +/// The statistics are calculated for each column in the table schema +/// using the row group statistics in the parquet metadata. +pub async fn statistics_from_parquet_meta( + metadata: &ParquetMetaData, + table_schema: SchemaRef, +) -> Result { let file_metadata = metadata.file_metadata(); let file_schema = parquet_to_arrow_schema( @@ -1402,6 +1415,66 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_statistics_from_parquet_metadata() -> Result<()> { + // Data for column c1: ["Foo", null, "bar"] + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap(); + + // Data for column c2: [1, 2, null] + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); + + // Use store_parquet to write each batch to its own file + // . batch1 written into first file and includes: + // - column c1 that has 3 rows with one null. Stats min and max of string column is missing for this test even the column has values + // . batch2 written into second file and includes: + // - column c2 that has 3 rows with one null. Stats min and max of int are avaialble and 1 and 2 respectively + let store = Arc::new(LocalFileSystem::new()) as _; + let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?; + + let state = SessionContext::new().state(); + let format = ParquetFormat::default(); + let schema = format.infer_schema(&state, &store, &files).await.unwrap(); + + let null_i64 = ScalarValue::Int64(None); + let null_utf8 = ScalarValue::Utf8(None); + + // Fetch statistics for first file + let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?; + let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?; + // + assert_eq!(stats.num_rows, Precision::Exact(3)); + // column c1 + let c1_stats = &stats.column_statistics[0]; + assert_eq!(c1_stats.null_count, Precision::Exact(1)); + assert_eq!(c1_stats.max_value, Precision::Absent); + assert_eq!(c1_stats.min_value, Precision::Absent); + // column c2: missing from the file so the table treats all 3 rows as null + let c2_stats = &stats.column_statistics[1]; + assert_eq!(c2_stats.null_count, Precision::Exact(3)); + assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone())); + assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone())); + + // Fetch statistics for second file + let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[1], None).await?; + let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?; + assert_eq!(stats.num_rows, Precision::Exact(3)); + // column c1: missing from the file so the table treats all 3 rows as null + let c1_stats = &stats.column_statistics[0]; + assert_eq!(c1_stats.null_count, Precision::Exact(3)); + assert_eq!(c1_stats.max_value, Precision::Exact(null_utf8.clone())); + assert_eq!(c1_stats.min_value, Precision::Exact(null_utf8.clone())); + // column c2 + let c2_stats = &stats.column_statistics[1]; + assert_eq!(c2_stats.null_count, Precision::Exact(1)); + assert_eq!(c2_stats.max_value, Precision::Exact(2i64.into())); + assert_eq!(c2_stats.min_value, Precision::Exact(1i64.into())); + + Ok(()) + } + #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); From 21c0f6e38af5ac66b13431a70b16883f6460b069 Mon Sep 17 00:00:00 2001 From: Marvin Lanhenke <62298609+marvinlanhenke@users.noreply.github.com> Date: Fri, 21 Jun 2024 18:18:14 +0200 Subject: [PATCH 2/3] Consider timezones with `UTC` and `+00:00` to be the same (#10960) * feat: add temporal_coercion check * fix: add return stmt * chore: add slts * fix: remove println * Update datafusion/expr/src/type_coercion/binary.rs --------- Co-authored-by: Andrew Lamb --- datafusion/expr/src/type_coercion/binary.rs | 14 +++++++---- .../sqllogictest/test_files/timestamps.slt | 23 +++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs index 615bb3ac568c..ea9d0c2fe72e 100644 --- a/datafusion/expr/src/type_coercion/binary.rs +++ b/datafusion/expr/src/type_coercion/binary.rs @@ -1050,12 +1050,16 @@ fn temporal_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { let tz = match (lhs_tz, rhs_tz) { - // can't cast across timezones (Some(lhs_tz), Some(rhs_tz)) => { - if lhs_tz != rhs_tz { - return None; - } else { - Some(lhs_tz.clone()) + match (lhs_tz.as_ref(), rhs_tz.as_ref()) { + // UTC and "+00:00" are the same by definition. Most other timezones + // do not have a 1-1 mapping between timezone and an offset from UTC + ("UTC", "+00:00") | ("+00:00", "UTC") => Some(lhs_tz.clone()), + (lhs, rhs) if lhs == rhs => Some(lhs_tz.clone()), + // can't cast across timezones + _ => { + return None; + } } } (Some(lhs_tz), None) => Some(lhs_tz.clone()), diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 7d5d601bbfdd..96d846d449e1 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -2801,3 +2801,26 @@ query B select current_time = current_time; ---- true + +# Test temporal coercion for UTC +query ? +select arrow_cast('2024-06-17T11:00:00', 'Timestamp(Nanosecond, Some("UTC"))') - arrow_cast('2024-06-17T12:00:00', 'Timestamp(Microsecond, Some("UTC"))'); +---- +0 days -1 hours 0 mins 0.000000 secs + +query ? +select arrow_cast('2024-06-17T13:00:00', 'Timestamp(Nanosecond, Some("+00:00"))') - arrow_cast('2024-06-17T12:00:00', 'Timestamp(Microsecond, Some("UTC"))'); +---- +0 days 1 hours 0 mins 0.000000 secs + +query ? +select arrow_cast('2024-06-17T13:00:00', 'Timestamp(Nanosecond, Some("UTC"))') - arrow_cast('2024-06-17T12:00:00', 'Timestamp(Microsecond, Some("+00:00"))'); +---- +0 days 1 hours 0 mins 0.000000 secs + +# not supported: coercion across timezones +query error +select arrow_cast('2024-06-17T13:00:00', 'Timestamp(Nanosecond, Some("UTC"))') - arrow_cast('2024-06-17T12:00:00', 'Timestamp(Microsecond, Some("+01:00"))'); + +query error +select arrow_cast('2024-06-17T13:00:00', 'Timestamp(Nanosecond, Some("+00:00"))') - arrow_cast('2024-06-17T12:00:00', 'Timestamp(Microsecond, Some("+01:00"))'); From 72beef601c153d4186c50c407f2fb0ae9a5846a8 Mon Sep 17 00:00:00 2001 From: Eduardo Vega Date: Sun, 23 Jun 2024 04:14:40 -0600 Subject: [PATCH 3/3] Support dictionary data type in array_to_string (#10908) * Support dictionary data type in array_to_string * Fix import * Some tests * Update datafusion/functions-array/src/string.rs Co-authored-by: Alex Huang * Add some tests showing incorrect results * Get logical array * apply rust fmt * Simplify implementation, avoid panics --------- Co-authored-by: Alex Huang Co-authored-by: Andrew Lamb --- datafusion/functions-array/src/string.rs | 29 ++++++++++-- datafusion/sqllogictest/test_files/array.slt | 48 ++++++++++++++++++++ 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/datafusion/functions-array/src/string.rs b/datafusion/functions-array/src/string.rs index 04832b4b1259..d02c863db8b7 100644 --- a/datafusion/functions-array/src/string.rs +++ b/datafusion/functions-array/src/string.rs @@ -26,12 +26,15 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field}; use datafusion_expr::TypeSignature; -use datafusion_common::{plan_err, DataFusionError, Result}; +use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; use std::any::{type_name, Any}; use crate::utils::{downcast_arg, make_scalar_function}; -use arrow_schema::DataType::{FixedSizeList, LargeList, LargeUtf8, List, Null, Utf8}; +use arrow::compute::cast; +use arrow_schema::DataType::{ + Dictionary, FixedSizeList, LargeList, LargeUtf8, List, Null, Utf8, +}; use datafusion_common::cast::{ as_generic_string_array, as_large_list_array, as_list_array, as_string_array, }; @@ -76,7 +79,7 @@ macro_rules! call_array_function { DataType::UInt16 => array_function!(UInt16Array), DataType::UInt32 => array_function!(UInt32Array), DataType::UInt64 => array_function!(UInt64Array), - _ => unreachable!(), + dt => not_impl_err!("Unsupported data type in array_to_string: {dt}"), } }; ($DATATYPE:expr, $INCLUDE_LIST:expr) => {{ @@ -95,7 +98,7 @@ macro_rules! call_array_function { DataType::UInt16 => array_function!(UInt16Array), DataType::UInt32 => array_function!(UInt32Array), DataType::UInt64 => array_function!(UInt64Array), - _ => unreachable!(), + dt => not_impl_err!("Unsupported data type in array_to_string: {dt}"), } }}; } @@ -245,6 +248,8 @@ pub(super) fn array_to_string_inner(args: &[ArrayRef]) -> Result { with_null_string = true; } + /// Creates a single string from single element of a ListArray (which is + /// itself another Array) fn compute_array_to_string( arg: &mut String, arr: ArrayRef, @@ -281,6 +286,22 @@ pub(super) fn array_to_string_inner(args: &[ArrayRef]) -> Result { Ok(arg) } + Dictionary(_key_type, value_type) => { + // Call cast to unwrap the dictionary. This could be optimized if we wanted + // to accept the overhead of extra code + let values = cast(&arr, value_type.as_ref()).map_err(|e| { + DataFusionError::from(e).context( + "Casting dictionary to values in compute_array_to_string", + ) + })?; + compute_array_to_string( + arg, + values, + delimiter, + null_string, + with_null_string, + ) + } Null => Ok(arg), data_type => { macro_rules! array_function { diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index c62c1ce29c06..1612adc643d9 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -3769,6 +3769,54 @@ select array_to_string(make_array(), ',') ---- (empty) +# array to string dictionary +statement ok +CREATE TABLE table1 AS VALUES + (1, 'foo'), + (3, 'bar'), + (1, 'foo'), + (2, NULL), + (NULL, 'baz') + ; + +# expect 1-3-1-2 (dictionary values should be repeated) +query T +SELECT array_to_string(array_agg(column1),'-') +FROM ( + SELECT arrow_cast(column1, 'Dictionary(Int32, Int32)') as column1 + FROM table1 +); +---- +1-3-1-2 + +# expect foo,bar,foo,baz (dictionary values should be repeated) +query T +SELECT array_to_string(array_agg(column2),',') +FROM ( + SELECT arrow_cast(column2, 'Dictionary(Int64, Utf8)') as column2 + FROM table1 +); +---- +foo,bar,foo,baz + +# Expect only values that are in the group +query I?T +SELECT column1, array_agg(column2), array_to_string(array_agg(column2),',') +FROM ( + SELECT column1, arrow_cast(column2, 'Dictionary(Int32, Utf8)') as column2 + FROM table1 +) +GROUP BY column1 +ORDER BY column1; +---- +1 [foo, foo] foo,foo +2 [] (empty) +3 [bar] bar +NULL [baz] baz + +statement ok +drop table table1; + ## array_union (aliases: `list_union`)