-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add documentation and usability for prepared parameters #7785
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,6 @@ | |
|
||
//! Expr module contains core type definition for `Expr`. | ||
|
||
use crate::aggregate_function; | ||
use crate::built_in_function; | ||
use crate::expr_fn::binary_expr; | ||
use crate::logical_plan::Subquery; | ||
|
@@ -26,8 +25,10 @@ use crate::utils::{expr_to_columns, find_out_reference_exprs}; | |
use crate::window_frame; | ||
use crate::window_function; | ||
use crate::Operator; | ||
use crate::{aggregate_function, ExprSchemable}; | ||
use arrow::datatypes::DataType; | ||
use datafusion_common::internal_err; | ||
use datafusion_common::tree_node::{Transformed, TreeNode}; | ||
use datafusion_common::{internal_err, DFSchema}; | ||
use datafusion_common::{plan_err, Column, DataFusionError, Result, ScalarValue}; | ||
use std::collections::HashSet; | ||
use std::fmt; | ||
|
@@ -599,10 +600,13 @@ impl InSubquery { | |
} | ||
} | ||
|
||
/// Placeholder | ||
/// Placeholder, representing bind parameter values such as `$1`. | ||
/// | ||
/// The type of these parameters is inferred using [`Expr::infer_placeholder_types`] | ||
/// or can be specified directly using `PREPARE` statements. | ||
#[derive(Clone, PartialEq, Eq, Hash, Debug)] | ||
pub struct Placeholder { | ||
/// The identifier of the parameter (e.g, $1 or $foo) | ||
/// The identifier of the parameter, including the leading `$` (e.g, `"$1"` or `"$foo'`) | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub id: String, | ||
/// The type the parameter will be filled in with | ||
pub data_type: Option<DataType>, | ||
|
@@ -1030,6 +1034,52 @@ impl Expr { | |
pub fn contains_outer(&self) -> bool { | ||
!find_out_reference_exprs(self).is_empty() | ||
} | ||
|
||
/// Recursively find all [`Expr::Placeholder`] expressions, and | ||
/// to infer their [`DataType`] from the context of their use. | ||
/// | ||
/// For example, gicen an expression like `<int32> = $0` will infer `$0` to | ||
/// have type `int32`. | ||
pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<Expr> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was moved from the |
||
self.transform(&|mut expr| { | ||
// Default to assuming the arguments are the same type | ||
if let Expr::BinaryExpr(BinaryExpr { left, op: _, right }) = &mut expr { | ||
rewrite_placeholder(left.as_mut(), right.as_ref(), schema)?; | ||
rewrite_placeholder(right.as_mut(), left.as_ref(), schema)?; | ||
}; | ||
if let Expr::Between(Between { | ||
expr, | ||
negated: _, | ||
low, | ||
high, | ||
}) = &mut expr | ||
{ | ||
rewrite_placeholder(low.as_mut(), expr.as_ref(), schema)?; | ||
rewrite_placeholder(high.as_mut(), expr.as_ref(), schema)?; | ||
} | ||
Ok(Transformed::Yes(expr)) | ||
}) | ||
} | ||
} | ||
|
||
// modifies expr if it is a placeholder with datatype of right | ||
fn rewrite_placeholder(expr: &mut Expr, other: &Expr, schema: &DFSchema) -> Result<()> { | ||
if let Expr::Placeholder(Placeholder { id: _, data_type }) = expr { | ||
if data_type.is_none() { | ||
let other_dt = other.get_type(schema); | ||
match other_dt { | ||
Err(e) => { | ||
Err(e.context(format!( | ||
"Can not find type of {other} needed to infer type of {expr}" | ||
)))?; | ||
} | ||
Ok(dt) => { | ||
*data_type = Some(dt); | ||
} | ||
} | ||
}; | ||
} | ||
Ok(()) | ||
} | ||
|
||
#[macro_export] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -928,8 +928,40 @@ impl LogicalPlan { | |
} | ||
} | ||
} | ||
/// Convert a prepared [`LogicalPlan`] into its inner logical plan | ||
/// with all params replaced with their corresponding values | ||
/// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`] | ||
/// with the specified `param_values`. | ||
/// | ||
/// [`LogicalPlan::Prepare`] are | ||
/// converted to their inner logical plan for execution. | ||
/// | ||
/// # Example | ||
/// ``` | ||
/// # use arrow::datatypes::{Field, Schema, DataType}; | ||
/// use datafusion_common::ScalarValue; | ||
/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan, placeholder}; | ||
/// # let schema = Schema::new(vec![ | ||
/// # Field::new("id", DataType::Int32, false), | ||
/// # ]); | ||
/// // Build SELECT * FROM t1 WHRERE id = $1 | ||
/// let plan = table_scan(Some("t1"), &schema, None).unwrap() | ||
/// .filter(col("id").eq(placeholder("$1"))).unwrap() | ||
/// .build().unwrap(); | ||
/// | ||
/// assert_eq!("Filter: t1.id = $1\ | ||
/// \n TableScan: t1", | ||
/// plan.display_indent().to_string() | ||
/// ); | ||
/// | ||
/// // Fill in the parameter $1 with a literal 3 | ||
/// let plan = plan.with_param_values(vec![ | ||
/// ScalarValue::from(3i32) // value at index 0 --> $1 | ||
/// ]).unwrap(); | ||
/// | ||
/// assert_eq!("Filter: t1.id = Int32(3)\ | ||
/// \n TableScan: t1", | ||
/// plan.display_indent().to_string() | ||
/// ); | ||
/// ``` | ||
pub fn with_param_values( | ||
self, | ||
param_values: Vec<ScalarValue>, | ||
|
@@ -961,7 +993,7 @@ impl LogicalPlan { | |
let input_plan = prepare_lp.input; | ||
input_plan.replace_params_with_values(¶m_values) | ||
} | ||
_ => Ok(self), | ||
_ => self.replace_params_with_values(¶m_values), | ||
} | ||
} | ||
|
||
|
@@ -1060,7 +1092,7 @@ impl LogicalPlan { | |
} | ||
|
||
impl LogicalPlan { | ||
/// applies collect to any subqueries in the plan | ||
/// applies `op` to any subqueries in the plan | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. drive by cleanups |
||
pub(crate) fn apply_subqueries<F>(&self, op: &mut F) -> datafusion_common::Result<()> | ||
where | ||
F: FnMut(&Self) -> datafusion_common::Result<VisitRecursion>, | ||
|
@@ -1112,17 +1144,22 @@ impl LogicalPlan { | |
Ok(()) | ||
} | ||
|
||
/// Return a logical plan with all placeholders/params (e.g $1 $2, | ||
/// ...) replaced with corresponding values provided in the | ||
/// params_values | ||
/// Return a `LogicalPlan` with all placeholders (e.g $1 $2, | ||
/// ...) replaced with corresponding values provided in | ||
/// `params_values` | ||
/// | ||
/// See [`Self::with_param_values`] for examples and usage | ||
pub fn replace_params_with_values( | ||
&self, | ||
param_values: &[ScalarValue], | ||
) -> Result<LogicalPlan> { | ||
let new_exprs = self | ||
.expressions() | ||
.into_iter() | ||
.map(|e| Self::replace_placeholders_with_values(e, param_values)) | ||
.map(|e| { | ||
let e = e.infer_placeholder_types(self.schema())?; | ||
Self::replace_placeholders_with_values(e, param_values) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
let new_inputs_with_values = self | ||
|
@@ -1219,7 +1256,9 @@ impl LogicalPlan { | |
// Various implementations for printing out LogicalPlans | ||
impl LogicalPlan { | ||
/// Return a `format`able structure that produces a single line | ||
/// per node. For example: | ||
/// per node. | ||
/// | ||
/// # Example | ||
/// | ||
/// ```text | ||
/// Projection: employee.id | ||
|
@@ -2321,7 +2360,7 @@ pub struct Unnest { | |
mod tests { | ||
use super::*; | ||
use crate::logical_plan::table_scan; | ||
use crate::{col, exists, in_subquery, lit}; | ||
use crate::{col, exists, in_subquery, lit, placeholder}; | ||
use arrow::datatypes::{DataType, Field, Schema}; | ||
use datafusion_common::tree_node::TreeNodeVisitor; | ||
use datafusion_common::{not_impl_err, DFSchema, TableReference}; | ||
|
@@ -2767,10 +2806,7 @@ digraph { | |
|
||
let plan = table_scan(TableReference::none(), &schema, None) | ||
.unwrap() | ||
.filter(col("id").eq(Expr::Placeholder(Placeholder::new( | ||
"".into(), | ||
Some(DataType::Int32), | ||
)))) | ||
.filter(col("id").eq(placeholder(""))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here is an example of the new |
||
.unwrap() | ||
.build() | ||
.unwrap(); | ||
|
@@ -2783,10 +2819,7 @@ digraph { | |
|
||
let plan = table_scan(TableReference::none(), &schema, None) | ||
.unwrap() | ||
.filter(col("id").eq(Expr::Placeholder(Placeholder::new( | ||
"$0".into(), | ||
Some(DataType::Int32), | ||
)))) | ||
.filter(col("id").eq(placeholder("$0"))) | ||
.unwrap() | ||
.build() | ||
.unwrap(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3684,6 +3684,19 @@ fn test_prepare_statement_should_infer_types() { | |
assert_eq!(actual_types, expected_types); | ||
} | ||
|
||
#[test] | ||
fn test_non_prepare_statement_should_infer_types() { | ||
// Non prepared statements (like SELECT) should also have their parameter types inferred | ||
let sql = "SELECT 1 + $1"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice |
||
let plan = logical_plan(sql).unwrap(); | ||
let actual_types = plan.get_parameter_types().unwrap(); | ||
let expected_types = HashMap::from([ | ||
// constant 1 is inferred to be int64 | ||
("$1".to_string(), Some(DataType::Int64)), | ||
]); | ||
assert_eq!(actual_types, expected_types); | ||
} | ||
|
||
#[test] | ||
#[should_panic( | ||
expected = "value: SQL(ParserError(\"Expected [NOT] NULL or TRUE|FALSE or [NOT] DISTINCT FROM after IS, found: $1\"" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️