diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 3e696d6b93a62..0fa84ccd5b3b4 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -62,7 +62,7 @@ cargo run --example csv_sql - [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es - [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file - [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files -- ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution +- ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after executionØo - [`pruning.rs`](examples/parquet_sql.rs): Use pruning to rule out files based on statistics - [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3 - [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 2c1470a1d6ecf..558b79125a221 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -25,9 +25,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::common::DFSchema; use datafusion::error::Result; use datafusion::optimizer::simplify_expressions::ExprSimplifier; -use datafusion::physical_expr::{ - analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr, -}; +use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries}; use datafusion::prelude::*; use datafusion_common::{ScalarValue, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; @@ -92,7 +90,8 @@ fn evaluate_demo() -> Result<()> { let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8))); // First, you make a "physical expression" from the logical `Expr` - let physical_expr = physical_expr(&batch.schema(), expr)?; + let df_schema = DFSchema::try_from(batch.schema())?; + let physical_expr = SessionContext::new().create_physical_expr(&df_schema, expr)?; // Now, you can evaluate the expression against the RecordBatch let result = physical_expr.evaluate(&batch)?; @@ -213,7 +212,7 @@ fn range_analysis_demo() -> Result<()> { // `date < '2020-10-01' AND date > '2020-09-01'` // As always, we need to tell DataFusion the type of column "date" - let schema = Schema::new(vec![make_field("date", DataType::Date32)]); + let schema = Arc::new(Schema::new(vec![make_field("date", DataType::Date32)])); // You can provide DataFusion any known boundaries on the values of `date` // (for example, maybe you know you only have data up to `2020-09-15`), but @@ -222,9 +221,13 @@ fn range_analysis_demo() -> Result<()> { let boundaries = ExprBoundaries::try_new_unbounded(&schema)?; // Now, we invoke the analysis code to perform the range analysis - let physical_expr = physical_expr(&schema, expr)?; - let analysis_result = - analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?; + let df_schema = DFSchema::try_from(schema)?; + let physical_expr = SessionContext::new().create_physical_expr(&df_schema, expr)?; + let analysis_result = analyze( + &physical_expr, + AnalysisContext::new(boundaries), + df_schema.as_ref(), + )?; // The results of the analysis is an range, encoded as an `Interval`, for // each column in the schema, that must be true in order for the predicate @@ -248,21 +251,6 @@ fn make_ts_field(name: &str) -> Field { make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz)) } -/// Build a physical expression from a logical one, after applying simplification and type coercion -pub fn physical_expr(schema: &Schema, expr: Expr) -> Result> { - let df_schema = schema.clone().to_dfschema_ref()?; - - // Simplify - let props = ExecutionProps::new(); - let simplifier = - ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone())); - - // apply type coercion here to ensure types match - let expr = simplifier.coerce(expr, &df_schema)?; - - create_physical_expr(&expr, df_schema.as_ref(), &props) -} - /// This function shows how to use `Expr::get_type` to retrieve the DataType /// of an expression fn expression_type_demo() -> Result<()> { diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index b2a3de72356c2..d097623b560c7 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -125,6 +125,20 @@ impl DFSchema { } } + /// Return a reference to the inner Arrow [`Schema`] + /// + /// Note this does not have the qualifier information + pub fn as_arrow(&self) -> &Schema { + self.inner.as_ref() + } + + /// Return a reference to the inner Arrow [`SchemaRef`] + /// + /// Note this does not have the qualifier information + pub fn inner(&self) -> &SchemaRef { + &self.inner + } + /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier pub fn new_with_metadata( qualified_fields: Vec<(Option, Arc)>, @@ -806,6 +820,12 @@ impl From<&DFSchema> for Schema { } } +impl AsRef for DFSchema { + fn as_ref(&self) -> &Schema { + self.as_arrow() + } +} + /// Create a `DFSchema` from an Arrow schema impl TryFrom for DFSchema { type Error = DataFusionError; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index d83644597e784..4df7366d2e629 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -70,13 +70,13 @@ use datafusion_common::{ config::{ConfigExtension, TableOptions}, exec_err, not_impl_err, plan_datafusion_err, plan_err, tree_node::{TreeNodeRecursion, TreeNodeVisitor}, - SchemaReference, TableReference, + DFSchema, SchemaReference, TableReference, }; use datafusion_execution::registry::SerializerRegistry; use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, var_provider::is_system_variables, - Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, + Expr, ExprSchemable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, }; use datafusion_sql::{ parser::{CopyToSource, CopyToStatement, DFParser}, @@ -91,10 +91,14 @@ use sqlparser::dialect::dialect_from_str; use url::Url; use uuid::Uuid; +use crate::physical_expr::PhysicalExpr; pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; pub use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr_rewriter::FunctionRewrite; +use datafusion_expr::simplify::SimplifyInfo; +use datafusion_optimizer::simplify_expressions::ExprSimplifier; +use datafusion_physical_expr::create_physical_expr; mod avro; mod csv; @@ -510,6 +514,18 @@ impl SessionContext { } } + /// Creates a [`PhysicalExpr`] from an [`Expr`] after applying + /// + /// See [`SessionState::create_physical_expr`] for more details and + /// examples. + pub fn create_physical_expr( + &self, + df_schema: &DFSchema, + expr: Expr, + ) -> Result> { + self.state.read().create_physical_expr(df_schema, expr) + } + // return an empty dataframe fn return_empty_dataframe(&self) -> Result { let plan = LogicalPlanBuilder::empty(false).build()?; @@ -1320,6 +1336,7 @@ pub enum RegisterFunction { /// Table user defined function Table(String, Arc), } + /// Execution context for registering data sources and executing queries. /// See [`SessionContext`] for a higher level API. /// @@ -1930,13 +1947,14 @@ impl SessionState { } } - /// Creates a physical plan from a logical plan. + /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`]. /// /// Note: this first calls [`Self::optimize`] on the provided /// plan. /// - /// This function will error for [`LogicalPlan`]s such as catalog - /// DDL `CREATE TABLE` must be handled by another layer. + /// This function will error for [`LogicalPlan`]s such as catalog DDL like + /// `CREATE TABLE`, which do not have corresponding physical plans and must + /// be handled by another layer, typically [`SessionContext`]. pub async fn create_physical_plan( &self, logical_plan: &LogicalPlan, @@ -1947,6 +1965,35 @@ impl SessionState { .await } + /// Creates a [`PhysicalExpr`] from an [`Expr`] after applying type + /// coercion, and function rewrites. + /// + /// Note that no simplification (TODO link) is applied. + /// + /// TODO links to coercsion, simplificiation, and rewrites + /// + /// # Example: + /// ``` + ///TODO + /// ``` + pub fn create_physical_expr( + &self, + // todo make this schema + df_schema: &DFSchema, + expr: Expr, + ) -> Result> { + // Simplify + let simplifier = + ExprSimplifier::new(SessionSimpifyProvider::new(self, df_schema)); + + // apply type coercion here to ensure types match + let expr = simplifier.coerce(expr, df_schema)?; + // TODO should we also simplify the expression? + // simplifier.simplify() + + create_physical_expr(&expr, df_schema, self.execution_props()) + } + /// Return the session ID pub fn session_id(&self) -> &str { &self.session_id @@ -2024,6 +2071,35 @@ impl SessionState { } } +struct SessionSimpifyProvider<'a> { + state: &'a SessionState, + df_schema: &'a DFSchema, +} + +impl<'a> SessionSimpifyProvider<'a> { + fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self { + Self { state, df_schema } + } +} + +impl<'a> SimplifyInfo for SessionSimpifyProvider<'a> { + fn is_boolean_type(&self, expr: &Expr) -> Result { + Ok(expr.get_type(self.df_schema)? == DataType::Boolean) + } + + fn nullable(&self, expr: &Expr) -> Result { + expr.nullable(self.df_schema) + } + + fn execution_props(&self) -> &ExecutionProps { + self.state.execution_props() + } + + fn get_data_type(&self, expr: &Expr) -> Result { + expr.get_type(self.df_schema) + } +} + struct SessionContextProvider<'a> { state: &'a SessionState, tables: HashMap>,