Skip to content

Commit

Permalink
Merge commit '8f718dd3ce291c9f5688144ca6c9d7d854dc4b0b' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-june-week-2-2
  • Loading branch information
appletreeisyellow committed Jun 24, 2024
2 parents a6d5f8a + 8f718dd commit 0165366
Show file tree
Hide file tree
Showing 71 changed files with 1,624 additions and 1,756 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.73"
rust-version = "1.75"
version = "39.0.0"

[workspace.dependencies]
Expand Down Expand Up @@ -107,7 +107,7 @@ doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
half = { version = "2.2.1", default-features = false }
hashbrown = { version = "0.14", features = ["raw"] }
hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://datafusion.apache.org"
repository = "https://github.com/apache/datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.73"
rust-version = "1.75"
readme = "README.md"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ impl DFSchema {
/// than datatype_is_semantically_equal in that a Dictionary<K,V> type is logically
/// equal to a plain V type, but not semantically equal. Dictionary<K1, V1> is also
/// logically equal to Dictionary<K2, V1>.
fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
// check nested fields
match (dt1, dt2) {
(DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
Expand Down
14 changes: 7 additions & 7 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
pub fn array_into_list_array(arr: ArrayRef) -> ListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
Arc::new(Field::new_list_field(arr.data_type().to_owned(), true)),
offsets,
arr,
None,
Expand All @@ -366,7 +366,7 @@ pub fn array_into_list_array(arr: ArrayRef) -> ListArray {
pub fn array_into_large_list_array(arr: ArrayRef) -> LargeListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
LargeListArray::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
Arc::new(Field::new_list_field(arr.data_type().to_owned(), true)),
offsets,
arr,
None,
Expand All @@ -379,7 +379,7 @@ pub fn array_into_fixed_size_list_array(
) -> FixedSizeListArray {
let list_size = list_size as i32;
FixedSizeListArray::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
Arc::new(Field::new_list_field(arr.data_type().to_owned(), true)),
list_size,
arr,
None,
Expand Down Expand Up @@ -420,7 +420,7 @@ pub fn arrays_into_list_array(
let data_type = arr[0].data_type().to_owned();
let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
Ok(ListArray::new(
Arc::new(Field::new("item", data_type, true)),
Arc::new(Field::new_list_field(data_type, true)),
OffsetBuffer::from_lengths(lens),
arrow::compute::concat(values.as_slice())?,
None,
Expand All @@ -435,7 +435,7 @@ pub fn arrays_into_list_array(
/// use datafusion_common::utils::base_type;
/// use std::sync::Arc;
///
/// let data_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
/// let data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
/// assert_eq!(base_type(&data_type), DataType::Int32);
///
/// let data_type = DataType::Int32;
Expand All @@ -458,10 +458,10 @@ pub fn base_type(data_type: &DataType) -> DataType {
/// use datafusion_common::utils::coerced_type_with_base_type_only;
/// use std::sync::Arc;
///
/// let data_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
/// let data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
/// let base_type = DataType::Float64;
/// let coerced_type = coerced_type_with_base_type_only(&data_type, &base_type);
/// assert_eq!(coerced_type, DataType::List(Arc::new(Field::new("item", DataType::Float64, true))));
/// assert_eq!(coerced_type, DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))));
pub fn coerced_type_with_base_type_only(
data_type: &DataType,
base_type: &DataType,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.73"
rust-version = "1.75"

[lints]
workspace = true
Expand Down
13 changes: 5 additions & 8 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ use datafusion_common::{
};
use datafusion_expr::lit;
use datafusion_expr::{
avg, count, max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown,
avg, max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown,
UNNAMED_TABLE,
};
use datafusion_expr::{case, is_null};
use datafusion_functions_aggregate::expr_fn::sum;
use datafusion_functions_aggregate::expr_fn::{median, stddev};
use datafusion_functions_aggregate::expr_fn::{count, median, stddev, sum};

use async_trait::async_trait;

Expand Down Expand Up @@ -854,10 +853,7 @@ impl DataFrame {
/// ```
pub async fn count(self) -> Result<usize> {
let rows = self
.aggregate(
vec![],
vec![datafusion_expr::count(Expr::Literal(COUNT_STAR_EXPANSION))],
)?
.aggregate(vec![], vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?
.collect()
.await?;
let len = *rows
Expand Down Expand Up @@ -1594,9 +1590,10 @@ mod tests {
use datafusion_common::{Constraint, Constraints};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::{
array_agg, cast, count_distinct, create_udf, expr, lit, BuiltInWindowFunction,
array_agg, cast, create_udf, expr, lit, BuiltInWindowFunction,
ScalarFunctionImplementation, Volatility, WindowFrame, WindowFunctionDefinition,
};
use datafusion_functions_aggregate::expr_fn::count_distinct;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

Expand Down
49 changes: 39 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,20 +547,49 @@ impl ListingOptions {
}
}

/// Reads data from one or more files via an
/// [`ObjectStore`]. For example, from
/// local files or objects from AWS S3. Implements [`TableProvider`],
/// a DataFusion data source.
/// Reads data from one or more files as a single table.
///
/// # Features
/// Implements [`TableProvider`], a DataFusion data source. The files are read
/// using an [`ObjectStore`] instance, for example from local files or objects
/// from AWS S3.
///
/// 1. Merges schemas if the files have compatible but not identical schemas
/// For example, given the `table1` directory (or object store prefix)
///
/// 2. Hive-style partitioning support, where a path such as
/// `/files/date=1/1/2022/data.parquet` is injected as a `date` column.
/// ```text
/// table1
/// ├── file1.parquet
/// └── file2.parquet
/// ```
///
/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as
/// a single table, merging the schemas if the files have compatible but not
/// identical schemas.
///
/// Given the `table2` directory (or object store prefix)
///
/// ```text
/// table2
/// ├── date=2024-06-01
/// │ ├── file3.parquet
/// │ └── file4.parquet
/// └── date=2024-06-02
/// └── file5.parquet
/// ```
///
/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and
/// `file5.parquet` as a single table, again merging schemas if necessary.
///
/// Given the hive style partitioning structure (e.g,. directories named
/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date`
/// column when reading the table:
/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01`
/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`.
///
/// If the query has a predicate like `WHERE date = '2024-06-01'`
/// only the corresponding directory will be read.
///
/// 3. Projection pushdown for formats that support it such as such as
/// Parquet
/// `ListingTable` also supports filter and projection pushdown for formats that
/// support it as such as Parquet.
///
/// # Example
///
Expand Down
50 changes: 28 additions & 22 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! [`min_statistics`] and [`max_statistics`] convert statistics in parquet format to arrow [`ArrayRef`].
//! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`].
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

Expand Down Expand Up @@ -542,16 +542,21 @@ pub(crate) fn parquet_column<'a>(
Some((parquet_idx, field))
}

/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
pub(crate) fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an
/// [`ArrayRef`]
///
/// This is an internal helper -- see [`StatisticsConverter`] for public API
fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
get_statistics!(Min, data_type, iterator)
}

/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
pub(crate) fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
///
/// This is an internal helper -- see [`StatisticsConverter`] for public API
fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
) -> Result<ArrayRef> {
Expand Down Expand Up @@ -1425,17 +1430,18 @@ mod test {
assert_eq!(idx, 2);

let row_groups = metadata.row_groups();
let iter = row_groups.iter().map(|x| x.column(idx).statistics());
let converter =
StatisticsConverter::try_new("int_col", &schema, parquet_schema).unwrap();

let min = min_statistics(&DataType::Int32, iter.clone()).unwrap();
let min = converter.row_group_mins(row_groups.iter()).unwrap();
assert_eq!(
&min,
&expected_min,
"Min. Statistics\n\n{}\n\n",
DisplayStats(row_groups)
);

let max = max_statistics(&DataType::Int32, iter).unwrap();
let max = converter.row_group_maxes(row_groups.iter()).unwrap();
assert_eq!(
&max,
&expected_max,
Expand Down Expand Up @@ -1623,22 +1629,23 @@ mod test {
continue;
}

let (idx, f) =
parquet_column(parquet_schema, &schema, field.name()).unwrap();
assert_eq!(f, field);
let converter =
StatisticsConverter::try_new(field.name(), &schema, parquet_schema)
.unwrap();

let iter = row_groups.iter().map(|x| x.column(idx).statistics());
let min = min_statistics(f.data_type(), iter.clone()).unwrap();
assert_eq!(converter.arrow_field, field.as_ref());

let mins = converter.row_group_mins(row_groups.iter()).unwrap();
assert_eq!(
&min,
&mins,
&expected_min,
"Min. Statistics\n\n{}\n\n",
DisplayStats(row_groups)
);

let max = max_statistics(f.data_type(), iter).unwrap();
let maxes = converter.row_group_maxes(row_groups.iter()).unwrap();
assert_eq!(
&max,
&maxes,
&expected_max,
"Max. Statistics\n\n{}\n\n",
DisplayStats(row_groups)
Expand Down Expand Up @@ -1705,7 +1712,7 @@ mod test {
self
}

/// Reads the specified parquet file and validates that the exepcted min/max
/// Reads the specified parquet file and validates that the expected min/max
/// values for the specified columns are as expected.
fn run(self) {
let path = PathBuf::from(parquet_test_data()).join(self.file_name);
Expand All @@ -1723,14 +1730,13 @@ mod test {
expected_max,
} = expected_column;

let (idx, field) =
parquet_column(parquet_schema, arrow_schema, name).unwrap();

let iter = row_groups.iter().map(|x| x.column(idx).statistics());
let actual_min = min_statistics(field.data_type(), iter.clone()).unwrap();
let converter =
StatisticsConverter::try_new(name, arrow_schema, parquet_schema)
.unwrap();
let actual_min = converter.row_group_mins(row_groups.iter()).unwrap();
assert_eq!(&expected_min, &actual_min, "column {name}");

let actual_max = max_statistics(field.data_type(), iter).unwrap();
let actual_max = converter.row_group_maxes(row_groups.iter()).unwrap();
assert_eq!(&expected_max, &actual_max, "column {name}");
}
}
Expand Down
Loading

0 comments on commit 0165366

Please sign in to comment.