Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add standalone example of using the SQL frontend #11088

Merged
merged 11 commits into from
Jul 2, 2024
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ cargo run --example csv_sql
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
- [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures
- [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings
- [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser`
- [`to_char.rs`](examples/to_char.rs): Examples of using the to_char function
- [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using to_timestamp functions
Expand Down
208 changes: 208 additions & 0 deletions datafusion-examples/examples/sql_frontend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{plan_err, Result};
use datafusion_expr::{
AggregateUDF, Expr, LogicalPlan, ScalarUDF, TableProviderFilterPushDown, TableSource,
WindowUDF,
};
use datafusion_optimizer::{
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule,
};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;
use std::any::Any;
use std::sync::Arc;

/// This example shows how to use DataFusion's SQL planner to parse SQL text and
/// build `LogicalPlan`s without executing them.
///
/// For example, if you need a SQL planner and optimizer like Apache Calcite,
/// but do not want a Java runtime dependency for some reason, you could use
/// DataFusion as a SQL frontend.
///
/// Normally, users interact with DataFusion via SessionContext. However, using
/// SessionContext requires depending on the full `datafusion` core crate.
///
/// In this example, we demonstrate how to use the lower level APIs directly,
/// which only requires the `datafusion-sql` dependency.
pub fn main() -> Result<()> {
// First, we parse the SQL string. Note that we use the DataFusion
// Parser, which wraps the `sqlparser-rs` SQL parser and adds DataFusion
// specific syntax such as `CREATE EXTERNAL TABLE`
let dialect = PostgreSqlDialect {};
let sql = "SELECT name FROM person WHERE age BETWEEN 21 AND 32";
let statements = Parser::parse_sql(&dialect, sql)?;

// Now, use DataFusion's SQL planner, called `SqlToRel` to create a
// `LogicalPlan` from the parsed statement
//
// To invoke SqlToRel we must provide it schema and function information
// via an object that implements the `ContextProvider` trait
//
// Projection: person.name
// Filter: person.age BETWEEN Int64(21) AND Int64(32)
// TableScan: person
let context_provider = MyContextProvider::default();
let sql_to_rel = SqlToRel::new(&context_provider);
let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?;
println!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can use assert! here to let the user know the expected output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a good idea -- I will do so (the existing examples are inconsistent with that).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #11230

"Unoptimized Logical Plan:\n\n{}\n",
logical_plan.display_indent()
);

// The initial LogicalPlan is a mechanical translation from the parsed SQL
// and often can not run without the Analyzer passes.
//
// In this example, `person.age` is actually a different data type (Int32)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the source schema data type defined as Uint8

// than the values to which it is compared to which are Int64. Most
// execution engines, including DataFusion's, will fail if you provide such
// a plan.
//
// To prepare it to run, we must apply type coercion to align types, and
// check for other semantic errors. In DataFusion this is done by a
// component called the Analyzer.
//
// Projection: person.name
// Filter: CAST(person.age AS Int64) BETWEEN Int64(21) AND Int64(32)
// TableScan: person
let config = OptimizerContext::default().with_skip_failing_rules(false);
let analyzed_plan = Analyzer::new().execute_and_check(
logical_plan,
config.options(),
observe_analyzer,
)?;
println!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert?

"Analyzed Logical Plan:\n\n{}\n",
analyzed_plan.display_indent()
);

// As we can see, the Analyzer added a CAST so the types are the same
// (Int64). However, this plan is not as efficient as it could be, as it
// will require casting *each row* of the input to UInt64 before comparison
// to 21 and 32. To optimize this query's performance, it is better to cast
// the constants once at plan time to Int32.
Copy link
Contributor

@mustafasrepo mustafasrepo Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should be to Uint8

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch -- fixed in 6152fd6

//
// Query optimization is handled in DataFusion by a component called the
// Optimizer, which we now invoke
//
// This results in a fully optimized plan:
//
// TableScan: person projection=[name], full_filters=[person.age >= UInt8(21), person.age <= UInt8(32)]
//
// The optimizer did several things to this plan:
// 1. Removed casts from person.age as we described above
// 2. Converted BETWEEN to two single columns inequalities (which are typically faster to execute)
// 3. Pushed the projection of `name` down to the scan (so the scan only returns that column)
// 4. Pushed the filter into the scan
let optimized_plan =
Optimizer::new().optimize(analyzed_plan, &config, observe_optimizer)?;
println!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert?

"Optimized Logical Plan:\n\n{}\n",
optimized_plan.display_indent()
);

Ok(())
}

// Note that both the optimizer and the analyzer take a callback, called an
// "observer" that is invoked after each pass. We do not do anything with these
// callbacks in this example

fn observe_analyzer(_plan: &LogicalPlan, _rule: &dyn AnalyzerRule) {}
fn observe_optimizer(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}

/// Implements the `ContextProvider` trait required to plan SQL
#[derive(Default)]
struct MyContextProvider {
options: ConfigOptions,
}

impl ContextProvider for MyContextProvider {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
if name.table() == "person" {
Ok(Arc::new(MyTableSource {
schema: Arc::new(Schema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::UInt8, false),
])),
}))
} else {
plan_err!("table not found")
alamb marked this conversation as resolved.
Show resolved Hide resolved
}
}

fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}

fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
None
}

fn options(&self) -> &ConfigOptions {
&self.options
}

fn udf_names(&self) -> Vec<String> {
Vec::new()
}

fn udaf_names(&self) -> Vec<String> {
Vec::new()
}

fn udwf_names(&self) -> Vec<String> {
Vec::new()
}
}

/// TableSource is the part of TableProvider needed for creating a LogicalPlan.
struct MyTableSource {
schema: SchemaRef,
}

impl TableSource for MyTableSource {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

// For this example, we report to the DataFusion optimizer that
// this provider can apply filters during the scan
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
}
}
19 changes: 11 additions & 8 deletions datafusion/expr/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,17 @@ impl std::fmt::Display for TableType {
}
}

/// The TableSource trait is used during logical query planning and optimizations and
/// provides access to schema information and filter push-down capabilities. This trait
/// provides a subset of the functionality of the TableProvider trait in the core
/// datafusion crate. The TableProvider trait provides additional capabilities needed for
/// physical query execution (such as the ability to perform a scan). The reason for
/// having two separate traits is to avoid having the logical plan code be dependent
/// on the DataFusion execution engine. Other projects may want to use DataFusion's
/// logical plans and have their own execution engine.
/// Access schema information and filter push-down capabilities.
///
/// The TableSource trait is used during logical query planning and
/// optimizations and provides a subset of the functionality of the
/// `TableProvider` trait in the (core) `datafusion` crate. The `TableProvider`
/// trait provides additional capabilities needed for physical query execution
/// (such as the ability to perform a scan).
///
/// The reason for having two separate traits is to avoid having the logical
/// plan code be dependent on the DataFusion execution engine. Some projects use
/// DataFusion's logical plans and have their own execution engine.
pub trait TableSource: Sync + Send {
fn as_any(&self) -> &dyn Any;

Expand Down