Skip to content

Commit

Permalink
Extend backtrace coverage for DatafusionError::Plan errors errors (#…
Browse files Browse the repository at this point in the history
…7803)

* improve backtrace coverag for plan errors

* fix cli

* cli fmt

* fix tests

* docs

* doc fmt
  • Loading branch information
comphead authored Oct 16, 2023
1 parent fa2bb6c commit cb6f7fe
Show file tree
Hide file tree
Showing 29 changed files with 236 additions and 218 deletions.
5 changes: 3 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
},
print_options::{MaxRows, PrintOptions},
};
use datafusion::common::plan_datafusion_err;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::{
datasource::listing::ListingTableUrl,
Expand Down Expand Up @@ -202,11 +203,11 @@ async fn exec_and_print(
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
DataFusionError::Plan(format!(
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
))
)
})?;
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
Expand Down
13 changes: 7 additions & 6 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;

use crate::error::{unqualified_field_not_found, DataFusionError, Result, SchemaError};
use crate::error::{
unqualified_field_not_found, DataFusionError, Result, SchemaError, _plan_err,
};
use crate::{
field_not_found, Column, FunctionalDependencies, OwnedTableReference, TableReference,
};
Expand Down Expand Up @@ -187,10 +189,10 @@ impl DFSchema {
match &self.fields[i].qualifier {
Some(qualifier) => {
if (qualifier.to_string() + "." + self.fields[i].name()) == name {
return Err(DataFusionError::Plan(format!(
return _plan_err!(
"Fully qualified field name '{name}' was supplied to `index_of` \
which is deprecated. Please use `index_of_column_by_name` instead"
)));
);
}
}
None => (),
Expand Down Expand Up @@ -378,12 +380,11 @@ impl DFSchema {
.zip(arrow_schema.fields().iter())
.try_for_each(|(l_field, r_field)| {
if !can_cast_types(r_field.data_type(), l_field.data_type()) {
Err(DataFusionError::Plan(
format!("Column {} (type: {}) is not compatible with column {} (type: {})",
_plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
r_field.name(),
r_field.data_type(),
l_field.name(),
l_field.data_type())))
l_field.data_type())
} else {
Ok(())
}
Expand Down
26 changes: 20 additions & 6 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,12 +477,25 @@ macro_rules! with_dollar_sign {
/// plan_err!("Error {:?}", val)
/// plan_err!("Error {val}")
/// plan_err!("Error {val:?}")
///
/// `NAME_ERR` - macro name for wrapping Err(DataFusionError::*)
/// `NAME_DF_ERR` - macro name for wrapping DataFusionError::*. Needed to keep backtrace opportunity
/// in construction where DataFusionError::* used directly, like `map_err`, `ok_or_else`, etc
macro_rules! make_error {
($NAME:ident, $ERR:ident) => {
($NAME_ERR:ident, $NAME_DF_ERR: ident, $ERR:ident) => {
with_dollar_sign! {
($d:tt) => {
/// Macro wraps `$ERR` to add backtrace feature
#[macro_export]
macro_rules! $NAME_DF_ERR {
($d($d args:expr),*) => {
DataFusionError::$ERR(format!("{}{}", format!($d($d args),*), DataFusionError::get_back_trace()).into())
}
}

/// Macro wraps Err(`$ERR`) to add backtrace feature
#[macro_export]
macro_rules! $NAME {
macro_rules! $NAME_ERR {
($d($d args:expr),*) => {
Err(DataFusionError::$ERR(format!("{}{}", format!($d($d args),*), DataFusionError::get_back_trace()).into()))
}
Expand All @@ -493,16 +506,16 @@ macro_rules! make_error {
}

// Exposes a macro to create `DataFusionError::Plan`
make_error!(plan_err, Plan);
make_error!(plan_err, plan_datafusion_err, Plan);

// Exposes a macro to create `DataFusionError::Internal`
make_error!(internal_err, Internal);
make_error!(internal_err, internal_datafusion_err, Internal);

// Exposes a macro to create `DataFusionError::NotImplemented`
make_error!(not_impl_err, NotImplemented);
make_error!(not_impl_err, not_impl_datafusion_err, NotImplemented);

// Exposes a macro to create `DataFusionError::Execution`
make_error!(exec_err, Execution);
make_error!(exec_err, exec_datafusion_err, Execution);

// Exposes a macro to create `DataFusionError::SQL`
#[macro_export]
Expand All @@ -517,6 +530,7 @@ macro_rules! sql_err {
pub use exec_err as _exec_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
pub use plan_err as _plan_err;

#[cfg(test)]
mod test {
Expand Down
25 changes: 13 additions & 12 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::vec::IntoIter;

use crate::error::_plan_err;
use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};

use sqlparser::ast::TableConstraint;
Expand Down Expand Up @@ -95,18 +96,18 @@ impl Constraints {
Constraint::Unique(indices)
})
}
TableConstraint::ForeignKey { .. } => Err(DataFusionError::Plan(
"Foreign key constraints are not currently supported".to_string(),
)),
TableConstraint::Check { .. } => Err(DataFusionError::Plan(
"Check constraints are not currently supported".to_string(),
)),
TableConstraint::Index { .. } => Err(DataFusionError::Plan(
"Indexes are not currently supported".to_string(),
)),
TableConstraint::FulltextOrSpatial { .. } => Err(DataFusionError::Plan(
"Indexes are not currently supported".to_string(),
)),
TableConstraint::ForeignKey { .. } => {
_plan_err!("Foreign key constraints are not currently supported")
}
TableConstraint::Check { .. } => {
_plan_err!("Check constraints are not currently supported")
}
TableConstraint::Index { .. } => {
_plan_err!("Indexes are not currently supported")
}
TableConstraint::FulltextOrSpatial { .. } => {
_plan_err!("Indexes are not currently supported")
}
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraints::new_unverified(constraints))
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,10 @@ impl FromStr for ListingTableInsertMode {
"append_to_file" => Ok(ListingTableInsertMode::AppendToFile),
"append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles),
"error" => Ok(ListingTableInsertMode::Error),
_ => Err(DataFusionError::Plan(format!(
_ => plan_err!(
"Unknown or unsupported insert mode {s}. Supported options are \
append_to_file, append_new_files, and error."
))),
),
}
}
}
Expand Down Expand Up @@ -865,10 +865,10 @@ impl TableProvider for ListingTable {
let writer_mode = match self.options.insert_mode {
ListingTableInsertMode::AppendToFile => {
if input_partitions > file_groups.len() {
return Err(DataFusionError::Plan(format!(
return plan_err!(
"Cannot append {input_partitions} partitions to {} files!",
file_groups.len()
)));
);
}

crate::datasource::file_format::write::FileWriterMode::Append
Expand Down Expand Up @@ -919,9 +919,9 @@ impl TableProvider for ListingTable {
self.options().insert_mode,
ListingTableInsertMode::AppendToFile
) {
return Err(DataFusionError::Plan(
"Cannot insert into a sorted ListingTable with mode append!".into(),
));
return plan_err!(
"Cannot insert into a sorted ListingTable with mode append!"
);
}
// Multiple sort orders in outer vec are equivalent, so we pass only the first one
let ordering = self
Expand Down
29 changes: 10 additions & 19 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
};
use datafusion_common::{
alias::AliasGenerator,
exec_err, not_impl_err, plan_err,
exec_err, not_impl_err, plan_datafusion_err, plan_err,
tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
};
use datafusion_execution::registry::SerializerRegistry;
Expand Down Expand Up @@ -1577,17 +1577,14 @@ impl SessionState {
self.catalog_list
.catalog(&resolved_ref.catalog)
.ok_or_else(|| {
DataFusionError::Plan(format!(
plan_datafusion_err!(
"failed to resolve catalog: {}",
resolved_ref.catalog
))
)
})?
.schema(&resolved_ref.schema)
.ok_or_else(|| {
DataFusionError::Plan(format!(
"failed to resolve schema: {}",
resolved_ref.schema
))
plan_datafusion_err!("failed to resolve schema: {}", resolved_ref.schema)
})
}

Expand Down Expand Up @@ -1689,11 +1686,11 @@ impl SessionState {
dialect: &str,
) -> Result<datafusion_sql::parser::Statement> {
let dialect = dialect_from_str(dialect).ok_or_else(|| {
DataFusionError::Plan(format!(
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
))
)
})?;
let mut statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
if statements.len() > 1 {
Expand Down Expand Up @@ -2022,7 +2019,7 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
self.tables
.get(&name)
.cloned()
.ok_or_else(|| DataFusionError::Plan(format!("table '{name}' not found")))
.ok_or_else(|| plan_datafusion_err!("table '{name}' not found"))
}

fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
Expand Down Expand Up @@ -2069,29 +2066,23 @@ impl FunctionRegistry for SessionState {
let result = self.scalar_functions.get(name);

result.cloned().ok_or_else(|| {
DataFusionError::Plan(format!(
"There is no UDF named \"{name}\" in the registry"
))
plan_datafusion_err!("There is no UDF named \"{name}\" in the registry")
})
}

fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
let result = self.aggregate_functions.get(name);

result.cloned().ok_or_else(|| {
DataFusionError::Plan(format!(
"There is no UDAF named \"{name}\" in the registry"
))
plan_datafusion_err!("There is no UDAF named \"{name}\" in the registry")
})
}

fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
let result = self.window_functions.get(name);

result.cloned().ok_or_else(|| {
DataFusionError::Plan(format!(
"There is no UDWF named \"{name}\" in the registry"
))
plan_datafusion_err!("There is no UDWF named \"{name}\" in the registry")
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_common::{downcast_value, ScalarValue};
use datafusion_common::{downcast_value, plan_datafusion_err, ScalarValue};
use datafusion_common::{
internal_err, plan_err,
tree_node::{Transformed, TreeNode},
Expand Down Expand Up @@ -451,7 +451,7 @@ fn build_statistics_record_batch<S: PruningStatistics>(
);

RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
DataFusionError::Plan(format!("Can not create statistics record batch: {err}"))
plan_datafusion_err!("Can not create statistics record batch: {err}")
})
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{
Expand Down Expand Up @@ -127,7 +127,7 @@ pub(crate) fn pushdown_sorts(
let plan = &requirements.plan;
let parent_required = requirements.required_ordering.as_deref();
const ERR_MSG: &str = "Expects parent requirement to contain something";
let err = || DataFusionError::Plan(ERR_MSG.to_string());
let err = || plan_datafusion_err!("{}", ERR_MSG);
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let mut new_plan = plan.clone();
if !ordering_satisfy_requirement(
Expand Down Expand Up @@ -199,7 +199,7 @@ fn pushdown_requirement_to_children(
parent_required: Option<&[PhysicalSortRequirement]>,
) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
const ERR_MSG: &str = "Expects parent requirement to contain something";
let err = || DataFusionError::Plan(ERR_MSG.to_string());
let err = || plan_datafusion_err!("{}", ERR_MSG);
let maintains_input_order = plan.maintains_input_order();
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
Expand Down
10 changes: 3 additions & 7 deletions datafusion/execution/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{

use datafusion_common::{
config::{ConfigOptions, Extensions},
DataFusionError, Result,
plan_datafusion_err, DataFusionError, Result,
};
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};

Expand Down Expand Up @@ -182,19 +182,15 @@ impl FunctionRegistry for TaskContext {
let result = self.scalar_functions.get(name);

result.cloned().ok_or_else(|| {
DataFusionError::Plan(format!(
"There is no UDF named \"{name}\" in the TaskContext"
))
plan_datafusion_err!("There is no UDF named \"{name}\" in the TaskContext")
})
}

fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
let result = self.aggregate_functions.get(name);

result.cloned().ok_or_else(|| {
DataFusionError::Plan(format!(
"There is no UDAF named \"{name}\" in the TaskContext"
))
plan_datafusion_err!("There is no UDAF named \"{name}\" in the TaskContext")
})
}

Expand Down
15 changes: 9 additions & 6 deletions datafusion/expr/src/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::utils;
use crate::{type_coercion::aggregates::*, Signature, TypeSignature, Volatility};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, Result};
use std::sync::Arc;
use std::{fmt, str::FromStr};
use strum_macros::EnumIter;
Expand Down Expand Up @@ -232,11 +232,14 @@ impl AggregateFunction {
// original errors are all related to wrong function signature
// aggregate them for better error message
.map_err(|_| {
DataFusionError::Plan(utils::generate_signature_error_msg(
&format!("{self}"),
self.signature(),
input_expr_types,
))
plan_datafusion_err!(
"{}",
utils::generate_signature_error_msg(
&format!("{self}"),
self.signature(),
input_expr_types,
)
)
})?;

match self {
Expand Down
Loading

0 comments on commit cb6f7fe

Please sign in to comment.