Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend backtrace coverage for DatafusionError::Plan errors errors #7803

Merged
merged 6 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ async fn exec_and_print(
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
DataFusionError::Plan(format!(
plan_err_raw!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
))
)
})?;
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
Expand Down
13 changes: 7 additions & 6 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;

use crate::error::{unqualified_field_not_found, DataFusionError, Result, SchemaError};
use crate::error::{
unqualified_field_not_found, DataFusionError, Result, SchemaError, _plan_err,
};
use crate::{
field_not_found, Column, FunctionalDependencies, OwnedTableReference, TableReference,
};
Expand Down Expand Up @@ -187,10 +189,10 @@ impl DFSchema {
match &self.fields[i].qualifier {
Some(qualifier) => {
if (qualifier.to_string() + "." + self.fields[i].name()) == name {
return Err(DataFusionError::Plan(format!(
return _plan_err!(
"Fully qualified field name '{name}' was supplied to `index_of` \
which is deprecated. Please use `index_of_column_by_name` instead"
)));
);
}
}
None => (),
Expand Down Expand Up @@ -378,12 +380,11 @@ impl DFSchema {
.zip(arrow_schema.fields().iter())
.try_for_each(|(l_field, r_field)| {
if !can_cast_types(r_field.data_type(), l_field.data_type()) {
Err(DataFusionError::Plan(
format!("Column {} (type: {}) is not compatible with column {} (type: {})",
_plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
r_field.name(),
r_field.data_type(),
l_field.name(),
l_field.data_type())))
l_field.data_type())
} else {
Ok(())
}
Expand Down
18 changes: 13 additions & 5 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,16 @@ macro_rules! with_dollar_sign {
/// plan_err!("Error {val}")
/// plan_err!("Error {val:?}")
macro_rules! make_error {
($NAME:ident, $ERR:ident) => {
($NAME:ident, $NAME_RAW: ident, $ERR:ident) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please document in comments what $NAME and $NAME_RAW are and how they are different?

my reading is that the raw is the name of an internal macro that gets generated. I think it would help to document more clearly what the difference in plan_err! and plan_err_raw! did (specifically that plan_err returns an Err(plan_err_raw)

with_dollar_sign! {
($d:tt) => {
#[macro_export]
macro_rules! $NAME_RAW {
($d($d args:expr),*) => {
DataFusionError::$ERR(format!("{}{}", format!($d($d args),*), DataFusionError::get_back_trace()).into())
}
}

#[macro_export]
macro_rules! $NAME {
($d($d args:expr),*) => {
Expand All @@ -493,16 +500,16 @@ macro_rules! make_error {
}

// Exposes a macro to create `DataFusionError::Plan`
make_error!(plan_err, Plan);
make_error!(plan_err, plan_err_raw, Plan);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not able to invent better naming, appreciate if anyone helps

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe plan_err_obj or plan_datafusion_error?


// Exposes a macro to create `DataFusionError::Internal`
make_error!(internal_err, Internal);
make_error!(internal_err, internal_err_raw, Internal);

// Exposes a macro to create `DataFusionError::NotImplemented`
make_error!(not_impl_err, NotImplemented);
make_error!(not_impl_err, not_impl_err_raw, NotImplemented);

// Exposes a macro to create `DataFusionError::Execution`
make_error!(exec_err, Execution);
make_error!(exec_err, exec_err_raw, Execution);

// Exposes a macro to create `DataFusionError::SQL`
#[macro_export]
Expand All @@ -517,6 +524,7 @@ macro_rules! sql_err {
pub use exec_err as _exec_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
pub use plan_err as _plan_err;

#[cfg(test)]
mod test {
Expand Down
25 changes: 13 additions & 12 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::vec::IntoIter;

use crate::error::_plan_err;
use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};

use sqlparser::ast::TableConstraint;
Expand Down Expand Up @@ -95,18 +96,18 @@ impl Constraints {
Constraint::Unique(indices)
})
}
TableConstraint::ForeignKey { .. } => Err(DataFusionError::Plan(
"Foreign key constraints are not currently supported".to_string(),
)),
TableConstraint::Check { .. } => Err(DataFusionError::Plan(
"Check constraints are not currently supported".to_string(),
)),
TableConstraint::Index { .. } => Err(DataFusionError::Plan(
"Indexes are not currently supported".to_string(),
)),
TableConstraint::FulltextOrSpatial { .. } => Err(DataFusionError::Plan(
"Indexes are not currently supported".to_string(),
)),
TableConstraint::ForeignKey { .. } => {
_plan_err!("Foreign key constraints are not currently supported")
}
TableConstraint::Check { .. } => {
_plan_err!("Check constraints are not currently supported")
}
TableConstraint::Index { .. } => {
_plan_err!("Indexes are not currently supported")
}
TableConstraint::FulltextOrSpatial { .. } => {
_plan_err!("Indexes are not currently supported")
}
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraints::new_unverified(constraints))
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,10 @@ impl FromStr for ListingTableInsertMode {
"append_to_file" => Ok(ListingTableInsertMode::AppendToFile),
"append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles),
"error" => Ok(ListingTableInsertMode::Error),
_ => Err(DataFusionError::Plan(format!(
_ => plan_err!(
"Unknown or unsupported insert mode {s}. Supported options are \
append_to_file, append_new_files, and error."
))),
),
}
}
}
Expand Down Expand Up @@ -865,10 +865,10 @@ impl TableProvider for ListingTable {
let writer_mode = match self.options.insert_mode {
ListingTableInsertMode::AppendToFile => {
if input_partitions > file_groups.len() {
return Err(DataFusionError::Plan(format!(
return plan_err!(
"Cannot append {input_partitions} partitions to {} files!",
file_groups.len()
)));
);
}

crate::datasource::file_format::write::FileWriterMode::Append
Expand Down Expand Up @@ -919,9 +919,9 @@ impl TableProvider for ListingTable {
self.options().insert_mode,
ListingTableInsertMode::AppendToFile
) {
return Err(DataFusionError::Plan(
"Cannot insert into a sorted ListingTable with mode append!".into(),
));
return plan_err!(
"Cannot insert into a sorted ListingTable with mode append!"
);
}
// Multiple sort orders in outer vec are equivalent, so we pass only the first one
let ordering = self
Expand Down
30 changes: 9 additions & 21 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
};
use datafusion_common::{
alias::AliasGenerator,
exec_err, not_impl_err, plan_err,
exec_err, not_impl_err, plan_err, plan_err_raw,
tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
};
use datafusion_execution::registry::SerializerRegistry;
Expand Down Expand Up @@ -1577,17 +1577,11 @@ impl SessionState {
self.catalog_list
.catalog(&resolved_ref.catalog)
.ok_or_else(|| {
DataFusionError::Plan(format!(
"failed to resolve catalog: {}",
resolved_ref.catalog
))
plan_err_raw!("failed to resolve catalog: {}", resolved_ref.catalog)
})?
.schema(&resolved_ref.schema)
.ok_or_else(|| {
DataFusionError::Plan(format!(
"failed to resolve schema: {}",
resolved_ref.schema
))
plan_err_raw!("failed to resolve schema: {}", resolved_ref.schema)
})
}

Expand Down Expand Up @@ -1689,11 +1683,11 @@ impl SessionState {
dialect: &str,
) -> Result<datafusion_sql::parser::Statement> {
let dialect = dialect_from_str(dialect).ok_or_else(|| {
DataFusionError::Plan(format!(
plan_err_raw!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
))
)
})?;
let mut statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
if statements.len() > 1 {
Expand Down Expand Up @@ -2022,7 +2016,7 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
self.tables
.get(&name)
.cloned()
.ok_or_else(|| DataFusionError::Plan(format!("table '{name}' not found")))
.ok_or_else(|| plan_err_raw!("table '{name}' not found"))
}

fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
Expand Down Expand Up @@ -2069,29 +2063,23 @@ impl FunctionRegistry for SessionState {
let result = self.scalar_functions.get(name);

result.cloned().ok_or_else(|| {
DataFusionError::Plan(format!(
"There is no UDF named \"{name}\" in the registry"
))
plan_err_raw!("There is no UDF named \"{name}\" in the registry")
})
}

fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
let result = self.aggregate_functions.get(name);

result.cloned().ok_or_else(|| {
DataFusionError::Plan(format!(
"There is no UDAF named \"{name}\" in the registry"
))
plan_err_raw!("There is no UDAF named \"{name}\" in the registry")
})
}

fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
let result = self.window_functions.get(name);

result.cloned().ok_or_else(|| {
DataFusionError::Plan(format!(
"There is no UDWF named \"{name}\" in the registry"
))
plan_err_raw!("There is no UDWF named \"{name}\" in the registry")
})
}
}
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_common::{downcast_value, ScalarValue};
use datafusion_common::{downcast_value, plan_err_raw, ScalarValue};
use datafusion_common::{
internal_err, plan_err,
tree_node::{Transformed, TreeNode},
Expand Down Expand Up @@ -450,9 +450,8 @@ fn build_statistics_record_batch<S: PruningStatistics>(
arrays
);

RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
DataFusionError::Plan(format!("Can not create statistics record batch: {err}"))
})
RecordBatch::try_new_with_options(schema, arrays, &options)
.map_err(|err| plan_err_raw!("Can not create statistics record batch: {err}"))
}

struct PruningExpressionBuilder<'a> {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_common::{plan_err, plan_err_raw, DataFusionError, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{
Expand Down Expand Up @@ -127,7 +127,7 @@ pub(crate) fn pushdown_sorts(
let plan = &requirements.plan;
let parent_required = requirements.required_ordering.as_deref();
const ERR_MSG: &str = "Expects parent requirement to contain something";
let err = || DataFusionError::Plan(ERR_MSG.to_string());
let err = || plan_err_raw!("{}", ERR_MSG);
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let mut new_plan = plan.clone();
if !ordering_satisfy_requirement(
Expand Down Expand Up @@ -199,7 +199,7 @@ fn pushdown_requirement_to_children(
parent_required: Option<&[PhysicalSortRequirement]>,
) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
const ERR_MSG: &str = "Expects parent requirement to contain something";
let err = || DataFusionError::Plan(ERR_MSG.to_string());
let err = || plan_err_raw!("{}", ERR_MSG);
let maintains_input_order = plan.maintains_input_order();
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
Expand Down
10 changes: 3 additions & 7 deletions datafusion/execution/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{

use datafusion_common::{
config::{ConfigOptions, Extensions},
DataFusionError, Result,
plan_err_raw, DataFusionError, Result,
};
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};

Expand Down Expand Up @@ -182,19 +182,15 @@ impl FunctionRegistry for TaskContext {
let result = self.scalar_functions.get(name);

result.cloned().ok_or_else(|| {
DataFusionError::Plan(format!(
"There is no UDF named \"{name}\" in the TaskContext"
))
plan_err_raw!("There is no UDF named \"{name}\" in the TaskContext")
})
}

fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
let result = self.aggregate_functions.get(name);

result.cloned().ok_or_else(|| {
DataFusionError::Plan(format!(
"There is no UDAF named \"{name}\" in the TaskContext"
))
plan_err_raw!("There is no UDAF named \"{name}\" in the TaskContext")
})
}

Expand Down
15 changes: 9 additions & 6 deletions datafusion/expr/src/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::utils;
use crate::{type_coercion::aggregates::*, Signature, TypeSignature, Volatility};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_common::{plan_err, plan_err_raw, DataFusionError, Result};
use std::sync::Arc;
use std::{fmt, str::FromStr};
use strum_macros::EnumIter;
Expand Down Expand Up @@ -232,11 +232,14 @@ impl AggregateFunction {
// original errors are all related to wrong function signature
// aggregate them for better error message
.map_err(|_| {
DataFusionError::Plan(utils::generate_signature_error_msg(
&format!("{self}"),
self.signature(),
input_expr_types,
))
plan_err_raw!(
"{}",
utils::generate_signature_error_msg(
&format!("{self}"),
self.signature(),
input_expr_types,
)
)
})?;

match self {
Expand Down
15 changes: 9 additions & 6 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
Volatility,
};
use arrow::datatypes::{DataType, Field, Fields, IntervalUnit, TimeUnit};
use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
use datafusion_common::{internal_err, plan_err, plan_err_raw, DataFusionError, Result};
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
Expand Down Expand Up @@ -501,11 +501,14 @@ impl BuiltinScalarFunction {

// verify that this is a valid set of data types for this function
data_types(input_expr_types, &self.signature()).map_err(|_| {
DataFusionError::Plan(utils::generate_signature_error_msg(
&format!("{self}"),
self.signature(),
input_expr_types,
))
plan_err_raw!(
"{}",
utils::generate_signature_error_msg(
&format!("{self}"),
self.signature(),
input_expr_types,
)
)
})?;

// the return type of the built in function.
Expand Down
Loading