Skip to content

Commit

Permalink
Merge commit '15045a8849e80327de68b1ec59e4a0644f05582c' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-4
  • Loading branch information
appletreeisyellow committed Apr 29, 2024
2 parents 86a5025 + 15045a8 commit fac857a
Show file tree
Hide file tree
Showing 37 changed files with 213 additions and 173 deletions.
5 changes: 2 additions & 3 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ notifications:
jira_options: link label worklog
github:
description: "Apache DataFusion SQL Query Engine"
homepage: https://arrow.apache.org/datafusion
homepage: https://datafusion.apache.org/
labels:
- arrow
- big-data
Expand All @@ -50,7 +50,6 @@ github:
required_approving_review_count: 1

# publishes the content of the `asf-site` branch to
# https://arrow.apache.org/datafusion/
# https://datafusion.apache.org/
publish:
whoami: asf-site
subdir: datafusion
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
under the License.
-->

See the [Contributor Guide](https://arrow.apache.org/datafusion/contributor-guide/index.html)
See the [Contributor Guide](https://datafusion.apache.org/contributor-guide/index.html)
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ in-memory format. [Python Bindings](https://github.com/apache/datafusion-python)

Here are links to some important information

- [Project Site](https://arrow.apache.org/datafusion)
- [Installation](https://arrow.apache.org/datafusion/user-guide/cli/installation.html)
- [Rust Getting Started](https://arrow.apache.org/datafusion/user-guide/example-usage.html)
- [Rust DataFrame API](https://arrow.apache.org/datafusion/user-guide/dataframe.html)
- [Project Site](https://datafusion.apache.org/)
- [Installation](https://datafusion.apache.org/user-guide/cli/installation.html)
- [Rust Getting Started](https://datafusion.apache.org/user-guide/example-usage.html)
- [Rust DataFrame API](https://datafusion.apache.org/user-guide/dataframe.html)
- [Rust API docs](https://docs.rs/datafusion/latest/datafusion)
- [Rust Examples](https://github.com/apache/datafusion/tree/master/datafusion-examples)
- [Python DataFrame API](https://arrow.apache.org/datafusion-python/)
Expand All @@ -58,14 +58,14 @@ Here are links to some important information
## What can you do with this crate?

DataFusion is great for building projects such as domain specific query engines, new database platforms and data pipelines, query languages and more.
It lets you start quickly from a fully working engine, and then customize those features specific to your use. [Click Here](https://arrow.apache.org/datafusion/user-guide/introduction.html#known-users) to see a list known users.
It lets you start quickly from a fully working engine, and then customize those features specific to your use. [Click Here](https://datafusion.apache.org/user-guide/introduction.html#known-users) to see a list known users.

## Contributing to DataFusion

Please see the [contributor guide] and [communication] pages for more information.

[contributor guide]: https://arrow.apache.org/datafusion/contributor-guide
[communication]: https://arrow.apache.org/datafusion/contributor-guide/communication.html
[contributor guide]: https://datafusion.apache.org/contributor-guide
[communication]: https://datafusion.apache.org/contributor-guide/communication.html

## Crate features

Expand Down
4 changes: 1 addition & 3 deletions benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ impl ConvertOpt {
.schema()
.iter()
.take(schema.fields.len() - 1)
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.map(Expr::from)
.collect();

csv = csv.select(selection)?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@

# DataFusion Command-line Interface

[DataFusion](https://arrow.apache.org/datafusion/) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.
[DataFusion](https://datafusion.apache.org/) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format.

DataFusion CLI (`datafusion-cli`) is a small command line utility that runs SQL queries using the DataFusion engine.

# Frequently Asked Questions

## Where can I find more information?

See the [`datafusion-cli` documentation](https://arrow.apache.org/datafusion/user-guide/cli.html) for further information.
See the [`datafusion-cli` documentation](https://datafusion.apache.org/user-guide/cli/index.html) for further information.

## How do I make my IDE work with `datafusion-cli`?

Expand Down
11 changes: 10 additions & 1 deletion datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Column
use arrow_schema::Field;
use arrow_schema::{Field, FieldRef};

use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
Expand Down Expand Up @@ -63,6 +63,8 @@ impl Column {
}

/// Create Column from unqualified name.
///
/// Alias for `Column::new_unqualified`
pub fn from_name(name: impl Into<String>) -> Self {
Self {
relation: None,
Expand Down Expand Up @@ -346,6 +348,13 @@ impl From<(Option<&TableReference>, &Field)> for Column {
}
}

/// Create a column, use qualifier and field name
impl From<(Option<&TableReference>, &FieldRef)> for Column {
fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
Self::new(relation.cloned(), field.name())
}
}

impl FromStr for Column {
type Err = Infallible;

Expand Down
7 changes: 3 additions & 4 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,18 +405,17 @@ pub trait TreeNode: Sized {
/// Returns true if `f` returns true for any node in the tree.
///
/// Stops recursion as soon as a matching node is found
fn exists<F: FnMut(&Self) -> bool>(&self, mut f: F) -> bool {
fn exists<F: FnMut(&Self) -> Result<bool>>(&self, mut f: F) -> Result<bool> {
let mut found = false;
self.apply(|n| {
Ok(if f(n) {
Ok(if f(n)? {
found = true;
TreeNodeRecursion::Stop
} else {
TreeNodeRecursion::Continue
})
})
.unwrap();
found
.map(|_| found)
}

/// Low-level API used to implement other APIs.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ This crate contains the main entry points and high level DataFusion APIs such as

For more information, please see:

- [DataFusion Website](https://arrow.apache.org/datafusion)
- [DataFusion Website](https://datafusion.apache.org)
- [DataFusion API Docs](https://docs.rs/datafusion/latest/datafusion/)
2 changes: 1 addition & 1 deletion datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl CatalogProviderList for MemoryCatalogProviderList {
/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can
/// read from Delta Lake tables
///
/// [`datafusion-cli`]: https://arrow.apache.org/datafusion/user-guide/cli.html
/// [`datafusion-cli`]: https://datafusion.apache.org/user-guide/cli/index.html
/// [`DynamicFileCatalogProvider`]: https://github.com/apache/datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75
/// [`catalog.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/catalog.rs
/// [delta-rs]: https://github.com/delta-io/delta-rs
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@ impl DataFrame {
col_exists = true;
new_column.clone()
} else {
col(Column::from((qualifier, field.as_ref())))
col(Column::from((qualifier, field)))
}
})
.collect();
Expand Down Expand Up @@ -1402,9 +1402,9 @@ impl DataFrame {
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
col(Column::from((qualifier, field.as_ref()))).alias(new_name)
col(Column::from((qualifier, field))).alias(new_name)
} else {
col(Column::from((qualifier, field.as_ref())))
col(Column::from((qualifier, field)))
}
})
.collect::<Vec<_>>();
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
//! additional data sources, query languages, functions, custom
//! operators and more. See the [Architecture] section for more details.
//!
//! [DataFusion]: https://arrow.apache.org/datafusion/
//! [DataFusion]: https://datafusion.apache.org/
//! [Apache Arrow]: https://arrow.apache.org
//! [use cases]: https://arrow.apache.org/datafusion/user-guide/introduction.html#use-cases
//! [SQL]: https://arrow.apache.org/datafusion/user-guide/sql/index.html
//! [use cases]: https://datafusion.apache.org/user-guide/introduction.html#use-cases
//! [SQL]: https://datafusion.apache.org/user-guide/sql/index.html
//! [`DataFrame`]: dataframe::DataFrame
//! [Architecture]: #architecture
//!
Expand Down
11 changes: 2 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1261,15 +1261,8 @@ impl DefaultPhysicalPlanner {

// Remove temporary projected columns
if left_projected || right_projected {
let final_join_result = join_schema
.iter()
.map(|(qualifier, field)| {
Expr::Column(datafusion_common::Column::from((
qualifier,
field.as_ref(),
)))
})
.collect::<Vec<_>>();
let final_join_result =
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
let projection = LogicalPlan::Projection(Projection::try_new(
final_join_result,
Arc::new(new_join),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use strum_macros::EnumIter;

/// Enum of all built-in aggregate functions
// Contributor's guide for adding new aggregate functions
// https://arrow.apache.org/datafusion/contributor-guide/index.html#how-to-add-a-new-aggregate-function
// https://datafusion.apache.org/contributor-guide/index.html#how-to-add-a-new-aggregate-function
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
pub enum AggregateFunction {
/// Count
Expand Down
62 changes: 51 additions & 11 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
Signature,
};

use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, FieldRef};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{
internal_err, plan_err, Column, DFSchema, Result, ScalarValue, TableReference,
Expand Down Expand Up @@ -84,6 +84,29 @@ use sqlparser::ast::NullTreatment;
/// assert_eq!(binary_expr.op, Operator::Eq);
/// }
/// ```
///
/// ## Return a list of [`Expr::Column`] from a schema's columns
/// ```
/// # use arrow::datatypes::{DataType, Field, Schema};
/// # use datafusion_common::{DFSchema, Column};
/// # use datafusion_expr::Expr;
///
/// let arrow_schema = Schema::new(vec![
/// Field::new("c1", DataType::Int32, false),
/// Field::new("c2", DataType::Float64, false),
/// ]);
/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
///
/// // Form a list of expressions for each item in the schema
/// let exprs: Vec<_> = df_schema.iter()
/// .map(Expr::from)
/// .collect();
///
/// assert_eq!(exprs, vec![
/// Expr::from(Column::from_qualified_name("t1.c1")),
/// Expr::from(Column::from_qualified_name("t1.c2")),
/// ]);
/// ```
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub enum Expr {
/// An expression with a specific name.
Expand Down Expand Up @@ -190,6 +213,23 @@ impl Default for Expr {
}
}

/// Create an [`Expr`] from a [`Column`]
impl From<Column> for Expr {
fn from(value: Column) -> Self {
Expr::Column(value)
}
}

/// Create an [`Expr`] from an optional qualifier and a [`FieldRef`]. This is
/// useful for creating [`Expr`] from a [`DFSchema`].
///
/// See example on [`Expr`]
impl<'a> From<(Option<&'a TableReference>, &'a FieldRef)> for Expr {
fn from(value: (Option<&'a TableReference>, &'a FieldRef)) -> Self {
Expr::from(Column::from(value))
}
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
pub expr: Box<Expr>,
Expand Down Expand Up @@ -1231,7 +1271,16 @@ impl Expr {

/// Return true when the expression contains out reference(correlated) expressions.
pub fn contains_outer(&self) -> bool {
self.exists(|expr| matches!(expr, Expr::OuterReferenceColumn { .. }))
self.exists(|expr| Ok(matches!(expr, Expr::OuterReferenceColumn { .. })))
.unwrap()
}

/// Returns true if the expression is volatile, i.e. whether it can return different
/// results when evaluated multiple times with the same input.
pub fn is_volatile(&self) -> Result<bool> {
self.exists(|expr| {
Ok(matches!(expr, Expr::ScalarFunction(func) if func.func_def.is_volatile()?))
})
}

/// Recursively find all [`Expr::Placeholder`] expressions, and
Expand Down Expand Up @@ -1891,15 +1940,6 @@ fn create_names(exprs: &[Expr]) -> Result<String> {
.join(", "))
}

/// Whether the given expression is volatile, i.e. whether it can return different results
/// when evaluated multiple times with the same input.
pub fn is_volatile(expr: &Expr) -> Result<bool> {
match expr {
Expr::ScalarFunction(func) => func.func_def.is_volatile(),
_ => Ok(false),
}
}

#[cfg(test)]
mod test {
use crate::expr::Cast;
Expand Down
8 changes: 1 addition & 7 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,7 @@ pub fn coerce_plan_expr_for_schema(
Ok(LogicalPlan::Projection(projection))
}
_ => {
let exprs: Vec<Expr> = plan
.schema()
.iter()
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.collect();
let exprs: Vec<Expr> = plan.schema().iter().map(Expr::from).collect();

let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?;
let add_project = new_exprs.iter().any(|expr| expr.try_into_col().is_err());
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ pub fn unnest_with_options(
return Ok(input);
}
};
qualified_columns.push(Column::from((unnest_qualifier, unnested_field.as_ref())));
qualified_columns.push(Column::from((unnest_qualifier, &unnested_field)));
unnested_fields.insert(index, unnested_field);
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl TreeNode for LogicalPlan {

/// Converts a `Arc<LogicalPlan>` without copying, if possible. Copies the plan
/// if there is a shared reference
fn unwrap_arc(plan: Arc<LogicalPlan>) -> LogicalPlan {
pub fn unwrap_arc(plan: Arc<LogicalPlan>) -> LogicalPlan {
Arc::try_unwrap(plan)
// if None is returned, there is another reference to this
// LogicalPlan, so we can not own it, and must clone instead
Expand Down
9 changes: 2 additions & 7 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,7 @@ fn get_exprs_except_skipped(
columns_to_skip: HashSet<Column>,
) -> Vec<Expr> {
if columns_to_skip.is_empty() {
schema
.iter()
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.collect::<Vec<Expr>>()
schema.iter().map(Expr::from).collect::<Vec<Expr>>()
} else {
schema
.columns()
Expand Down Expand Up @@ -855,7 +850,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
match expr {
Expr::Column(col) => {
let (qualifier, field) = plan.schema().qualified_field_from_column(col)?;
Ok(Expr::Column(Column::from((qualifier, field))))
Ok(Expr::from(Column::from((qualifier, field))))
}
_ => Ok(Expr::Column(Column::from_name(expr.display_name()?))),
}
Expand Down
Loading

0 comments on commit fac857a

Please sign in to comment.