-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add standalone example of using the SQL frontend #11088
Changes from 7 commits
fa8db51
c6944ce
afa37f8
f222a10
682e4d2
3b9d9c0
6152fd6
14a24b4
2d913d8
984604e
7bf2b20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; | ||
use datafusion_common::config::ConfigOptions; | ||
use datafusion_common::{plan_err, Result}; | ||
use datafusion_expr::{ | ||
AggregateUDF, Expr, LogicalPlan, ScalarUDF, TableProviderFilterPushDown, TableSource, | ||
WindowUDF, | ||
}; | ||
use datafusion_optimizer::{ | ||
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule, | ||
}; | ||
use datafusion_sql::planner::{ContextProvider, SqlToRel}; | ||
use datafusion_sql::sqlparser::dialect::PostgreSqlDialect; | ||
use datafusion_sql::sqlparser::parser::Parser; | ||
use datafusion_sql::TableReference; | ||
use std::any::Any; | ||
use std::sync::Arc; | ||
|
||
/// This example shows how to use DataFusion's SQL planner to parse SQL text and | ||
/// build `LogicalPlan`s without executing them. | ||
/// | ||
/// For example, if you need a SQL planner and optimizer like Apache Calcite, | ||
/// but do not want a Java runtime dependency for some reason, you could use | ||
/// DataFusion as a SQL frontend. | ||
/// | ||
/// Normally, users interact with DataFusion via SessionContext. However, using | ||
/// SessionContext requires depending on the full `datafusion` core crate. | ||
/// | ||
/// In this example, we demonstrate how to use the lower level APIs directly, | ||
/// which only requires the `datafusion-sql` dependency. | ||
pub fn main() -> Result<()> { | ||
// First, we parse the SQL string. Note that we use the DataFusion | ||
// Parser, which wraps the `sqlparser-rs` SQL parser and adds DataFusion | ||
// specific syntax such as `CREATE EXTERNAL TABLE` | ||
let dialect = PostgreSqlDialect {}; | ||
let sql = "SELECT name FROM person WHERE age BETWEEN 21 AND 32"; | ||
let statements = Parser::parse_sql(&dialect, sql)?; | ||
|
||
// Now, use DataFusion's SQL planner, called `SqlToRel` to create a | ||
// `LogicalPlan` from the parsed statement | ||
// | ||
// To invoke SqlToRel we must provide it schema and function information | ||
// via an object that implements the `ContextProvider` trait | ||
// | ||
// Projection: person.name | ||
// Filter: person.age BETWEEN Int64(21) AND Int64(32) | ||
// TableScan: person | ||
let context_provider = MyContextProvider::default(); | ||
let sql_to_rel = SqlToRel::new(&context_provider); | ||
let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?; | ||
println!( | ||
"Unoptimized Logical Plan:\n\n{}\n", | ||
logical_plan.display_indent() | ||
); | ||
|
||
// The initial LogicalPlan is a mechanical translation from the parsed SQL | ||
// and often can not run without the Analyzer passes. | ||
// | ||
// In this example, `person.age` is actually a different data type (Int32) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the source schema data type defined as |
||
// than the values to which it is compared to which are Int64. Most | ||
// execution engines, including DataFusion's, will fail if you provide such | ||
// a plan. | ||
// | ||
// To prepare it to run, we must apply type coercion to align types, and | ||
// check for other semantic errors. In DataFusion this is done by a | ||
// component called the Analyzer. | ||
// | ||
// Projection: person.name | ||
// Filter: CAST(person.age AS Int64) BETWEEN Int64(21) AND Int64(32) | ||
// TableScan: person | ||
let config = OptimizerContext::default().with_skip_failing_rules(false); | ||
let analyzed_plan = Analyzer::new().execute_and_check( | ||
logical_plan, | ||
config.options(), | ||
observe_analyzer, | ||
)?; | ||
println!( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. assert? |
||
"Analyzed Logical Plan:\n\n{}\n", | ||
analyzed_plan.display_indent() | ||
); | ||
|
||
// As we can see, the Analyzer added a CAST so the types are the same | ||
// (Int64). However, this plan is not as efficient as it could be, as it | ||
// will require casting *each row* of the input to UInt64 before comparison | ||
// to 21 and 32. To optimize this query's performance, it is better to cast | ||
// the constants once at plan time to UInt8. | ||
// | ||
// Query optimization is handled in DataFusion by a component called the | ||
// Optimizer, which we now invoke | ||
// | ||
// This results in a fully optimized plan: | ||
// | ||
// TableScan: person projection=[name], full_filters=[person.age >= UInt8(21), person.age <= UInt8(32)] | ||
// | ||
// The optimizer did several things to this plan: | ||
// 1. Removed casts from person.age as we described above | ||
// 2. Converted BETWEEN to two single columns inequalities (which are typically faster to execute) | ||
// 3. Pushed the projection of `name` down to the scan (so the scan only returns that column) | ||
// 4. Pushed the filter into the scan | ||
let optimized_plan = | ||
Optimizer::new().optimize(analyzed_plan, &config, observe_optimizer)?; | ||
println!( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. assert? |
||
"Optimized Logical Plan:\n\n{}\n", | ||
optimized_plan.display_indent() | ||
); | ||
|
||
Ok(()) | ||
} | ||
|
||
// Note that both the optimizer and the analyzer take a callback, called an | ||
// "observer" that is invoked after each pass. We do not do anything with these | ||
// callbacks in this example | ||
|
||
fn observe_analyzer(_plan: &LogicalPlan, _rule: &dyn AnalyzerRule) {} | ||
fn observe_optimizer(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} | ||
|
||
/// Implements the `ContextProvider` trait required to plan SQL | ||
#[derive(Default)] | ||
struct MyContextProvider { | ||
options: ConfigOptions, | ||
} | ||
|
||
impl ContextProvider for MyContextProvider { | ||
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> { | ||
if name.table() == "person" { | ||
Ok(Arc::new(MyTableSource { | ||
schema: Arc::new(Schema::new(vec![ | ||
Field::new("name", DataType::Utf8, false), | ||
Field::new("age", DataType::UInt8, false), | ||
])), | ||
})) | ||
} else { | ||
plan_err!("table not found") | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> { | ||
None | ||
} | ||
|
||
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> { | ||
None | ||
} | ||
|
||
fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> { | ||
None | ||
} | ||
|
||
fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> { | ||
None | ||
} | ||
|
||
fn options(&self) -> &ConfigOptions { | ||
&self.options | ||
} | ||
|
||
fn udf_names(&self) -> Vec<String> { | ||
Vec::new() | ||
} | ||
|
||
fn udaf_names(&self) -> Vec<String> { | ||
Vec::new() | ||
} | ||
|
||
fn udwf_names(&self) -> Vec<String> { | ||
Vec::new() | ||
} | ||
} | ||
|
||
/// TableSource is the part of TableProvider needed for creating a LogicalPlan. | ||
struct MyTableSource { | ||
schema: SchemaRef, | ||
} | ||
|
||
impl TableSource for MyTableSource { | ||
fn as_any(&self) -> &dyn Any { | ||
self | ||
} | ||
|
||
fn schema(&self) -> SchemaRef { | ||
self.schema.clone() | ||
} | ||
|
||
// For this example, we report to the DataFusion optimizer that | ||
// this provider can apply filters during the scan | ||
fn supports_filters_pushdown( | ||
&self, | ||
filters: &[&Expr], | ||
) -> Result<Vec<TableProviderFilterPushDown>> { | ||
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can use
assert!
here to let the user know the expected output?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is a good idea -- I will do so (the existing examples are inconsistent with that).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #11230