diff --git a/datafusion/functions/src/core/nullif.rs b/datafusion/functions/src/core/nullif.rs index f83bd987c937..73bfba9b38b1 100644 --- a/datafusion/functions/src/core/nullif.rs +++ b/datafusion/functions/src/core/nullif.rs @@ -18,15 +18,15 @@ //! Encoding expressions use arrow::datatypes::DataType; -use datafusion_common::{internal_err, Result, DataFusionError}; +use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::ColumnarValue; -use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; -use std::any::Any; use arrow::array::Array; use arrow::compute::kernels::cmp::eq; use arrow::compute::kernels::nullif::nullif; use datafusion_common::ScalarValue; +use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; +use std::any::Any; #[derive(Debug)] pub(super) struct NullIfFunc { @@ -58,7 +58,7 @@ impl NullIfFunc { Self { signature: Signature::uniform(2, SUPPORTED_NULLIF_TYPES.to_vec(), - Volatility::Immutable, + Volatility::Immutable, ) } } @@ -81,7 +81,7 @@ impl ScalarUDFImpl for NullIfFunc { let coerced_types = datafusion_expr::type_coercion::functions::data_types(arg_types, &self.signature); coerced_types.map(|typs| typs[0].clone()) .map_err(|e| e.context("Failed to coerce arguments for NULLIF") - ) + ) } fn invoke(&self, args: &[ColumnarValue]) -> Result { @@ -90,14 +90,13 @@ impl ScalarUDFImpl for NullIfFunc { } - /// Implements NULLIF(expr1, expr2) /// Args: 0 - left expr is any array /// 1 - if the left is equal to this expr2, then the result is NULL, otherwise left value is passed. /// fn nullif_func(args: &[ColumnarValue]) -> Result { if args.len() != 2 { - return internal_err!( + return exec_err!( "{:?} args were supplied but NULLIF takes exactly two args", args.len() ); diff --git a/datafusion/functions/src/encoding/inner.rs b/datafusion/functions/src/encoding/inner.rs index 886a031a5269..4cbeab3092c7 100644 --- a/datafusion/functions/src/encoding/inner.rs +++ b/datafusion/functions/src/encoding/inner.rs @@ -22,11 +22,11 @@ use arrow::{ datatypes::DataType, }; use base64::{engine::general_purpose, Engine as _}; -use datafusion_common::ScalarValue; use datafusion_common::{ cast::{as_generic_binary_array, as_generic_string_array}, - internal_err, not_impl_err, plan_err, + not_impl_err, plan_err, }; +use datafusion_common::{exec_err, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; use std::sync::Arc; @@ -111,6 +111,7 @@ impl DecodeFunc { } } } + impl ScalarUDFImpl for DecodeFunc { fn as_any(&self) -> &dyn Any { self @@ -148,6 +149,7 @@ enum Encoding { Base64, Hex, } + fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result { match value { ColumnarValue::Array(a) => match a.data_type() { @@ -155,7 +157,7 @@ fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result encoding.encode_utf8_array::(a.as_ref()), DataType::Binary => encoding.encode_binary_array::(a.as_ref()), DataType::LargeBinary => encoding.encode_binary_array::(a.as_ref()), - other => internal_err!( + other => exec_err!( "Unsupported data type {other:?} for function encode({encoding})" ), }, @@ -171,7 +173,7 @@ fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result Ok(encoding .encode_large_scalar(a.as_ref().map(|v: &Vec| v.as_slice()))), - other => internal_err!( + other => exec_err!( "Unsupported data type {other:?} for function encode({encoding})" ), } @@ -186,7 +188,7 @@ fn decode_process(value: &ColumnarValue, encoding: Encoding) -> Result encoding.decode_utf8_array::(a.as_ref()), DataType::Binary => encoding.decode_binary_array::(a.as_ref()), DataType::LargeBinary => encoding.decode_binary_array::(a.as_ref()), - other => internal_err!( + other => exec_err!( "Unsupported data type {other:?} for function decode({encoding})" ), }, @@ -202,7 +204,7 @@ fn decode_process(value: &ColumnarValue, encoding: Encoding) -> Result encoding .decode_large_scalar(a.as_ref().map(|v: &Vec| v.as_slice())), - other => internal_err!( + other => exec_err!( "Unsupported data type {other:?} for function decode({encoding})" ), } @@ -270,8 +272,8 @@ impl Encoding { } fn encode_binary_array(self, value: &dyn Array) -> Result - where - T: OffsetSizeTrait, + where + T: OffsetSizeTrait, { let input_value = as_generic_binary_array::(value)?; let array: ArrayRef = match self { @@ -282,8 +284,8 @@ impl Encoding { } fn encode_utf8_array(self, value: &dyn Array) -> Result - where - T: OffsetSizeTrait, + where + T: OffsetSizeTrait, { let input_value = as_generic_string_array::(value)?; let array: ArrayRef = match self { @@ -350,8 +352,8 @@ impl Encoding { } fn decode_binary_array(self, value: &dyn Array) -> Result - where - T: OffsetSizeTrait, + where + T: OffsetSizeTrait, { let input_value = as_generic_binary_array::(value)?; let array: ArrayRef = match self { @@ -362,8 +364,8 @@ impl Encoding { } fn decode_utf8_array(self, value: &dyn Array) -> Result - where - T: OffsetSizeTrait, + where + T: OffsetSizeTrait, { let input_value = as_generic_string_array::(value)?; let array: ArrayRef = match self { @@ -405,7 +407,7 @@ impl FromStr for Encoding { /// Standard encodings are base64 and hex. fn encode(args: &[ColumnarValue]) -> Result { if args.len() != 2 { - return internal_err!( + return exec_err!( "{:?} args were supplied but encode takes exactly two arguments", args.len() ); @@ -431,7 +433,7 @@ fn encode(args: &[ColumnarValue]) -> Result { /// Standard encodings are base64 and hex. fn decode(args: &[ColumnarValue]) -> Result { if args.len() != 2 { - return internal_err!( + return exec_err!( "{:?} args were supplied but decode takes exactly two arguments", args.len() ); diff --git a/datafusion/functions/src/math/abs.rs b/datafusion/functions/src/math/abs.rs index 21ca37fb8ec3..9ba0e3da2ad4 100644 --- a/datafusion/functions/src/math/abs.rs +++ b/datafusion/functions/src/math/abs.rs @@ -24,9 +24,9 @@ use arrow::array::Int32Array; use arrow::array::Int64Array; use arrow::array::Int8Array; use arrow::datatypes::DataType; -use datafusion_common::not_impl_err; +use datafusion_common::{exec_err, not_impl_err}; use datafusion_common::plan_datafusion_err; -use datafusion_common::{internal_err, Result, DataFusionError}; +use datafusion_common::{Result, DataFusionError}; use datafusion_expr::utils; use datafusion_expr::ColumnarValue; @@ -165,7 +165,7 @@ impl ScalarUDFImpl for AbsFunc { let args = ColumnarValue::values_to_arrays(args)?; if args.len() != 1 { - return internal_err!("abs function requires 1 argument, got {}", args.len()); + return exec_err!("abs function requires 1 argument, got {}", args.len()); } let input_data_type = args[0].data_type(); diff --git a/datafusion/functions/src/math/nans.rs b/datafusion/functions/src/math/nans.rs index 20754c18aa8e..c7868e6d5eca 100644 --- a/datafusion/functions/src/math/nans.rs +++ b/datafusion/functions/src/math/nans.rs @@ -18,14 +18,14 @@ //! Encoding expressions use arrow::datatypes::DataType; -use datafusion_common::{internal_err, Result, DataFusionError}; +use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::ColumnarValue; +use arrow::array::{ArrayRef, BooleanArray, Float32Array, Float64Array}; use datafusion_expr::TypeSignature::*; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; use std::any::Any; use std::sync::Arc; -use arrow::array::{ArrayRef, BooleanArray, Float32Array, Float64Array}; #[derive(Debug)] pub(super) struct IsNanFunc { @@ -73,7 +73,7 @@ impl ScalarUDFImpl for IsNanFunc { BooleanArray, { f64::is_nan } )) - }, + } DataType::Float32 => { Arc::new(make_function_scalar_inputs_return_type!( &args[0], @@ -82,8 +82,8 @@ impl ScalarUDFImpl for IsNanFunc { BooleanArray, { f32::is_nan } )) - }, - other => return internal_err!("Unsupported data type {other:?} for function isnan"), + } + other => return exec_err!("Unsupported data type {other:?} for function isnan"), }; Ok(ColumnarValue::Array(arr)) } diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index 1a3d21fc40bc..2918856aa623 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -28,14 +28,15 @@ use std::sync::Arc; +use arrow::datatypes::Schema; + +use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result}; +use datafusion_expr::AggregateFunction; + use crate::aggregate::regr::RegrType; use crate::expressions::{self, Literal}; use crate::{AggregateExpr, PhysicalExpr, PhysicalSortExpr}; -use arrow::datatypes::Schema; -use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; -use datafusion_expr::AggregateFunction; - /// Create a physical aggregation expression. /// This function errors when `input_phy_exprs`' can't be coerced to a valid argument type of the aggregation function. pub fn create_aggregate_expr( @@ -379,9 +380,7 @@ pub fn create_aggregate_expr( .downcast_ref::() .map(|literal| literal.value()) else { - return internal_err!( - "Second argument of NTH_VALUE needs to be a literal" - ); + return exec_err!("Second argument of NTH_VALUE needs to be a literal"); }; let nullable = expr.nullable(input_schema)?; Arc::new(expressions::NthValueAgg::new( @@ -415,17 +414,19 @@ pub fn create_aggregate_expr( #[cfg(test)] mod tests { - use super::*; + use arrow::datatypes::{DataType, Field}; + + use datafusion_common::{plan_err, ScalarValue}; + use datafusion_expr::type_coercion::aggregates::NUMERICS; + use datafusion_expr::{type_coercion, Signature}; + use crate::expressions::{ try_cast, ApproxDistinct, ApproxMedian, ApproxPercentileCont, ArrayAgg, Avg, BitAnd, BitOr, BitXor, BoolAnd, BoolOr, Correlation, Count, Covariance, DistinctArrayAgg, DistinctCount, Max, Min, Stddev, Sum, Variance, }; - use arrow::datatypes::{DataType, Field}; - use datafusion_common::{plan_err, ScalarValue}; - use datafusion_expr::type_coercion::aggregates::NUMERICS; - use datafusion_expr::{type_coercion, Signature}; + use super::*; #[test] fn test_count_arragg_approx_expr() -> Result<()> { diff --git a/datafusion/physical-expr/src/conditional_expressions.rs b/datafusion/physical-expr/src/conditional_expressions.rs index 782897d46379..cc8f3c8dfaf0 100644 --- a/datafusion/physical-expr/src/conditional_expressions.rs +++ b/datafusion/physical-expr/src/conditional_expressions.rs @@ -19,14 +19,14 @@ use arrow::array::{new_null_array, Array, BooleanArray}; use arrow::compute::kernels::zip::zip; use arrow::compute::{and, is_not_null, is_null}; -use datafusion_common::{internal_err, DataFusionError, Result}; +use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::ColumnarValue; /// coalesce evaluates to the first value which is not NULL pub fn coalesce(args: &[ColumnarValue]) -> Result { // do not accept 0 arguments. if args.is_empty() { - return internal_err!( + return exec_err!( "coalesce was called with {} arguments. It requires at least 1.", args.len() ); diff --git a/datafusion/physical-expr/src/crypto_expressions.rs b/datafusion/physical-expr/src/crypto_expressions.rs index 580b0ed01b6e..3ff3bc83f297 100644 --- a/datafusion/physical-expr/src/crypto_expressions.rs +++ b/datafusion/physical-expr/src/crypto_expressions.rs @@ -23,11 +23,11 @@ use arrow::{ }; use blake2::{Blake2b512, Blake2s256, Digest}; use blake3::Hasher as Blake3; -use datafusion_common::ScalarValue; use datafusion_common::{ cast::{as_binary_array, as_generic_binary_array, as_generic_string_array}, plan_err, }; +use datafusion_common::{exec_err, ScalarValue}; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_expr::ColumnarValue; use md5::Md5; @@ -66,7 +66,7 @@ fn digest_process( DataType::LargeBinary => { digest_algorithm.digest_binary_array::(a.as_ref()) } - other => internal_err!( + other => exec_err!( "Unsupported data type {other:?} for function {digest_algorithm}" ), }, @@ -77,7 +77,7 @@ fn digest_process( } ScalarValue::Binary(a) | ScalarValue::LargeBinary(a) => Ok(digest_algorithm .digest_scalar(a.as_ref().map(|v: &Vec| v.as_slice()))), - other => internal_err!( + other => exec_err!( "Unsupported data type {other:?} for function {digest_algorithm}" ), }, @@ -238,7 +238,7 @@ macro_rules! define_digest_function { #[doc = $DOC] pub fn $NAME(args: &[ColumnarValue]) -> Result { if args.len() != 1 { - return internal_err!( + return exec_err!( "{:?} args were supplied but {} takes exactly one argument", args.len(), DigestAlgorithm::$METHOD.to_string() @@ -264,7 +264,7 @@ fn hex_encode>(data: T) -> String { /// computes md5 hash digest of the given input pub fn md5(args: &[ColumnarValue]) -> Result { if args.len() != 1 { - return internal_err!( + return exec_err!( "{:?} args were supplied but {} takes exactly one argument", args.len(), DigestAlgorithm::Md5 @@ -284,7 +284,7 @@ pub fn md5(args: &[ColumnarValue]) -> Result { ColumnarValue::Scalar(ScalarValue::Binary(opt)) => { ColumnarValue::Scalar(ScalarValue::Utf8(opt.map(hex_encode::<_>))) } - _ => return internal_err!("Impossibly got invalid results from digest"), + _ => return exec_err!("Impossibly got invalid results from digest"), }) } @@ -329,7 +329,7 @@ define_digest_function!( /// Standard algorithms are md5, sha1, sha224, sha256, sha384 and sha512. pub fn digest(args: &[ColumnarValue]) -> Result { if args.len() != 2 { - return internal_err!( + return exec_err!( "{:?} args were supplied but digest takes exactly two arguments", args.len() ); @@ -339,7 +339,7 @@ pub fn digest(args: &[ColumnarValue]) -> Result { ScalarValue::Utf8(Some(method)) | ScalarValue::LargeUtf8(Some(method)) => { method.parse::() } - other => internal_err!("Unsupported data type {other:?} for function digest"), + other => exec_err!("Unsupported data type {other:?} for function digest"), }, ColumnarValue::Array(_) => { internal_err!("Digest using dynamically decided method is not yet supported") diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 0dc3f96dc12a..c91b96d67a22 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -42,7 +42,7 @@ use arrow::{ datatypes::{DataType, Int32Type, Int64Type, Schema}, }; use arrow_array::Array; -use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue}; pub use datafusion_expr::FuncMonotonicity; use datafusion_expr::{ type_coercion::functions::data_types, BuiltinScalarFunction, ColumnarValue, @@ -95,6 +95,7 @@ macro_rules! invoke_if_crypto_expressions_feature_flag { #[cfg(not(feature = "crypto_expressions"))] macro_rules! invoke_if_crypto_expressions_feature_flag { ($FUNC:ident, $NAME:expr) => { + use datafusion_common::internal_err; |_: &[ColumnarValue]| -> Result { internal_err!( "function {} requires compilation with feature flag: crypto_expressions.", @@ -433,7 +434,7 @@ pub fn create_physical_fun( DataType::LargeUtf8 => { make_scalar_function_inner(string_expressions::ascii::)(args) } - other => internal_err!("Unsupported data type {other:?} for function ascii"), + other => exec_err!("Unsupported data type {other:?} for function ascii"), }), BuiltinScalarFunction::BitLength => Arc::new(|args| match &args[0] { ColumnarValue::Array(v) => Ok(ColumnarValue::Array(bit_length(v.as_ref())?)), @@ -454,7 +455,7 @@ pub fn create_physical_fun( DataType::LargeUtf8 => { make_scalar_function_inner(string_expressions::btrim::)(args) } - other => internal_err!("Unsupported data type {other:?} for function btrim"), + other => exec_err!("Unsupported data type {other:?} for function btrim"), }), BuiltinScalarFunction::CharacterLength => { Arc::new(|args| match args[0].data_type() { @@ -474,7 +475,7 @@ pub fn create_physical_fun( ); make_scalar_function_inner(func)(args) } - other => internal_err!( + other => exec_err!( "Unsupported data type {other:?} for function character_length" ), }) @@ -536,7 +537,7 @@ pub fn create_physical_fun( make_scalar_function_inner(string_expressions::initcap::)(args) } other => { - internal_err!("Unsupported data type {other:?} for function initcap") + exec_err!("Unsupported data type {other:?} for function initcap") } }), BuiltinScalarFunction::InStr => Arc::new(|args| match args[0].data_type() { @@ -546,7 +547,7 @@ pub fn create_physical_fun( DataType::LargeUtf8 => { make_scalar_function_inner(string_expressions::instr::)(args) } - other => internal_err!("Unsupported data type {other:?} for function instr"), + other => exec_err!("Unsupported data type {other:?} for function instr"), }), BuiltinScalarFunction::Left => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { @@ -557,7 +558,7 @@ pub fn create_physical_fun( let func = invoke_if_unicode_expressions_feature_flag!(left, i64, "left"); make_scalar_function_inner(func)(args) } - other => internal_err!("Unsupported data type {other:?} for function left"), + other => exec_err!("Unsupported data type {other:?} for function left"), }), BuiltinScalarFunction::Lower => Arc::new(string_expressions::lower), BuiltinScalarFunction::Lpad => Arc::new(|args| match args[0].data_type() { @@ -569,7 +570,7 @@ pub fn create_physical_fun( let func = invoke_if_unicode_expressions_feature_flag!(lpad, i64, "lpad"); make_scalar_function_inner(func)(args) } - other => internal_err!("Unsupported data type {other:?} for function lpad"), + other => exec_err!("Unsupported data type {other:?} for function lpad"), }), BuiltinScalarFunction::Ltrim => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { @@ -578,7 +579,7 @@ pub fn create_physical_fun( DataType::LargeUtf8 => { make_scalar_function_inner(string_expressions::ltrim::)(args) } - other => internal_err!("Unsupported data type {other:?} for function ltrim"), + other => exec_err!("Unsupported data type {other:?} for function ltrim"), }), BuiltinScalarFunction::MD5 => { Arc::new(invoke_if_crypto_expressions_feature_flag!(md5, "md5")) @@ -616,7 +617,7 @@ pub fn create_physical_fun( make_scalar_function_inner(func)(args) } other => { - internal_err!("Unsupported data type {other:?} for function regexp_like") + exec_err!("Unsupported data type {other:?} for function regexp_like") } }), BuiltinScalarFunction::RegexpMatch => { @@ -637,9 +638,9 @@ pub fn create_physical_fun( ); make_scalar_function_inner(func)(args) } - other => internal_err!( - "Unsupported data type {other:?} for function regexp_match" - ), + other => { + exec_err!("Unsupported data type {other:?} for function regexp_match") + } }) } BuiltinScalarFunction::RegexpReplace => { @@ -662,7 +663,7 @@ pub fn create_physical_fun( let func = specializer_func(args)?; func(args) } - other => internal_err!( + other => exec_err!( "Unsupported data type {other:?} for function regexp_replace" ), }) @@ -674,7 +675,7 @@ pub fn create_physical_fun( DataType::LargeUtf8 => { make_scalar_function_inner(string_expressions::repeat::)(args) } - other => internal_err!("Unsupported data type {other:?} for function repeat"), + other => exec_err!("Unsupported data type {other:?} for function repeat"), }), BuiltinScalarFunction::Replace => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { @@ -684,7 +685,7 @@ pub fn create_physical_fun( make_scalar_function_inner(string_expressions::replace::)(args) } other => { - internal_err!("Unsupported data type {other:?} for function replace") + exec_err!("Unsupported data type {other:?} for function replace") } }), BuiltinScalarFunction::Reverse => Arc::new(|args| match args[0].data_type() { @@ -699,7 +700,7 @@ pub fn create_physical_fun( make_scalar_function_inner(func)(args) } other => { - internal_err!("Unsupported data type {other:?} for function reverse") + exec_err!("Unsupported data type {other:?} for function reverse") } }), BuiltinScalarFunction::Right => Arc::new(|args| match args[0].data_type() { @@ -713,7 +714,7 @@ pub fn create_physical_fun( invoke_if_unicode_expressions_feature_flag!(right, i64, "right"); make_scalar_function_inner(func)(args) } - other => internal_err!("Unsupported data type {other:?} for function right"), + other => exec_err!("Unsupported data type {other:?} for function right"), }), BuiltinScalarFunction::Rpad => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { @@ -724,7 +725,7 @@ pub fn create_physical_fun( let func = invoke_if_unicode_expressions_feature_flag!(rpad, i64, "rpad"); make_scalar_function_inner(func)(args) } - other => internal_err!("Unsupported data type {other:?} for function rpad"), + other => exec_err!("Unsupported data type {other:?} for function rpad"), }), BuiltinScalarFunction::Rtrim => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { @@ -733,7 +734,7 @@ pub fn create_physical_fun( DataType::LargeUtf8 => { make_scalar_function_inner(string_expressions::rtrim::)(args) } - other => internal_err!("Unsupported data type {other:?} for function rtrim"), + other => exec_err!("Unsupported data type {other:?} for function rtrim"), }), BuiltinScalarFunction::SHA224 => { Arc::new(invoke_if_crypto_expressions_feature_flag!(sha224, "sha224")) @@ -755,7 +756,7 @@ pub fn create_physical_fun( make_scalar_function_inner(string_expressions::split_part::)(args) } other => { - internal_err!("Unsupported data type {other:?} for function split_part") + exec_err!("Unsupported data type {other:?} for function split_part") } }), BuiltinScalarFunction::StringToArray => { @@ -767,7 +768,7 @@ pub fn create_physical_fun( array_expressions::string_to_array::, )(args), other => { - internal_err!( + exec_err!( "Unsupported data type {other:?} for function string_to_array" ) } @@ -781,7 +782,7 @@ pub fn create_physical_fun( make_scalar_function_inner(string_expressions::starts_with::)(args) } other => { - internal_err!("Unsupported data type {other:?} for function starts_with") + exec_err!("Unsupported data type {other:?} for function starts_with") } }), BuiltinScalarFunction::EndsWith => Arc::new(|args| match args[0].data_type() { @@ -792,7 +793,7 @@ pub fn create_physical_fun( make_scalar_function_inner(string_expressions::ends_with::)(args) } other => { - internal_err!("Unsupported data type {other:?} for function ends_with") + exec_err!("Unsupported data type {other:?} for function ends_with") } }), BuiltinScalarFunction::Strpos => Arc::new(|args| match args[0].data_type() { @@ -808,7 +809,7 @@ pub fn create_physical_fun( ); make_scalar_function_inner(func)(args) } - other => internal_err!("Unsupported data type {other:?} for function strpos"), + other => exec_err!("Unsupported data type {other:?} for function strpos"), }), BuiltinScalarFunction::Substr => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { @@ -821,7 +822,7 @@ pub fn create_physical_fun( invoke_if_unicode_expressions_feature_flag!(substr, i64, "substr"); make_scalar_function_inner(func)(args) } - other => internal_err!("Unsupported data type {other:?} for function substr"), + other => exec_err!("Unsupported data type {other:?} for function substr"), }), BuiltinScalarFunction::ToHex => Arc::new(|args| match args[0].data_type() { DataType::Int32 => { @@ -830,7 +831,7 @@ pub fn create_physical_fun( DataType::Int64 => { make_scalar_function_inner(string_expressions::to_hex::)(args) } - other => internal_err!("Unsupported data type {other:?} for function to_hex"), + other => exec_err!("Unsupported data type {other:?} for function to_hex"), }), BuiltinScalarFunction::Translate => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { @@ -850,7 +851,7 @@ pub fn create_physical_fun( make_scalar_function_inner(func)(args) } other => { - internal_err!("Unsupported data type {other:?} for function translate") + exec_err!("Unsupported data type {other:?} for function translate") } }), BuiltinScalarFunction::Trim => Arc::new(|args| match args[0].data_type() { @@ -860,13 +861,13 @@ pub fn create_physical_fun( DataType::LargeUtf8 => { make_scalar_function_inner(string_expressions::btrim::)(args) } - other => internal_err!("Unsupported data type {other:?} for function trim"), + other => exec_err!("Unsupported data type {other:?} for function trim"), }), BuiltinScalarFunction::Upper => Arc::new(string_expressions::upper), BuiltinScalarFunction::Uuid => Arc::new(string_expressions::uuid), BuiltinScalarFunction::ArrowTypeof => Arc::new(move |args| { if args.len() != 1 { - return internal_err!( + return exec_err!( "arrow_typeof function requires 1 arguments, got {}", args.len() ); @@ -884,9 +885,7 @@ pub fn create_physical_fun( DataType::LargeUtf8 => { make_scalar_function_inner(string_expressions::overlay::)(args) } - other => Err(DataFusionError::Internal(format!( - "Unsupported data type {other:?} for function overlay", - ))), + other => exec_err!("Unsupported data type {other:?} for function overlay"), }), BuiltinScalarFunction::Levenshtein => { Arc::new(|args| match args[0].data_type() { @@ -896,9 +895,9 @@ pub fn create_physical_fun( DataType::LargeUtf8 => make_scalar_function_inner( string_expressions::levenshtein::, )(args), - other => Err(DataFusionError::Internal(format!( - "Unsupported data type {other:?} for function levenshtein", - ))), + other => { + exec_err!("Unsupported data type {other:?} for function levenshtein") + } }) } BuiltinScalarFunction::SubstrIndex => { @@ -919,9 +918,9 @@ pub fn create_physical_fun( ); make_scalar_function_inner(func)(args) } - other => Err(DataFusionError::Internal(format!( - "Unsupported data type {other:?} for function substr_index", - ))), + other => { + exec_err!("Unsupported data type {other:?} for function substr_index") + } }) } BuiltinScalarFunction::FindInSet => Arc::new(|args| match args[0].data_type() { @@ -941,9 +940,9 @@ pub fn create_physical_fun( ); make_scalar_function_inner(func)(args) } - other => Err(DataFusionError::Internal(format!( - "Unsupported data type {other:?} for function find_in_set", - ))), + other => { + exec_err!("Unsupported data type {other:?} for function find_in_set") + } }), }) } @@ -1023,7 +1022,7 @@ mod tests { record_batch::RecordBatch, }; use datafusion_common::cast::{as_boolean_array, as_uint64_array}; - use datafusion_common::{exec_err, plan_err}; + use datafusion_common::{exec_err, internal_err, plan_err}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::type_coercion::functions::data_types; use datafusion_expr::Signature; diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index b622aee8e2b3..98a05dff5386 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -17,19 +17,20 @@ //! Math expressions +use std::any::type_name; +use std::iter; +use std::mem::swap; +use std::sync::Arc; + use arrow::array::ArrayRef; use arrow::array::{BooleanArray, Float32Array, Float64Array, Int64Array}; use arrow::datatypes::DataType; -use datafusion_common::internal_err; -use datafusion_common::ScalarValue; +use rand::{thread_rng, Rng}; + use datafusion_common::ScalarValue::{Float32, Int64}; +use datafusion_common::{exec_err, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; -use rand::{thread_rng, Rng}; -use std::any::type_name; -use std::iter; -use std::mem::swap; -use std::sync::Arc; macro_rules! downcast_compute_op { ($ARRAY:expr, $NAME:expr, $FUNC:ident, $TYPE:ident) => {{ @@ -40,7 +41,7 @@ macro_rules! downcast_compute_op { arrow::compute::kernels::arity::unary(array, |x| x.$FUNC()); Ok(Arc::new(res)) } - _ => internal_err!("Invalid data type for {}", $NAME), + _ => exec_err!("Invalid data type for {}", $NAME), } }}; } @@ -57,11 +58,9 @@ macro_rules! unary_primitive_array_op { let result = downcast_compute_op!(array, $NAME, $FUNC, Float64Array); Ok(ColumnarValue::Array(result?)) } - other => internal_err!( - "Unsupported data type {:?} for function {}", - other, - $NAME - ), + other => { + exec_err!("Unsupported data type {:?} for function {}", other, $NAME) + } }, ColumnarValue::Scalar(a) => match a { ScalarValue::Float32(a) => Ok(ColumnarValue::Scalar( @@ -70,7 +69,7 @@ macro_rules! unary_primitive_array_op { ScalarValue::Float64(a) => Ok(ColumnarValue::Scalar( ScalarValue::Float64(a.map(|x| x.$FUNC())), )), - _ => internal_err!( + _ => exec_err!( "Unsupported data type {:?} for function {}", ($VALUE).data_type(), $NAME @@ -187,7 +186,7 @@ pub fn factorial(args: &[ArrayRef]) -> Result { Int64Array, { |value: i64| { (1..=value).product() } } )) as ArrayRef), - other => internal_err!("Unsupported data type {other:?} for function factorial."), + other => exec_err!("Unsupported data type {other:?} for function factorial."), } } @@ -234,7 +233,7 @@ pub fn gcd(args: &[ArrayRef]) -> Result { Int64Array, { compute_gcd } )) as ArrayRef), - other => internal_err!("Unsupported data type {other:?} for function gcd"), + other => exec_err!("Unsupported data type {other:?} for function gcd"), } } @@ -260,7 +259,7 @@ pub fn lcm(args: &[ArrayRef]) -> Result { Int64Array, { compute_lcm } )) as ArrayRef), - other => internal_err!("Unsupported data type {other:?} for function lcm"), + other => exec_err!("Unsupported data type {other:?} for function lcm"), } } @@ -305,7 +304,7 @@ pub fn nanvl(args: &[ArrayRef]) -> Result { )) as ArrayRef) } - other => internal_err!("Unsupported data type {other:?} for function nanvl"), + other => exec_err!("Unsupported data type {other:?} for function nanvl"), } } @@ -328,7 +327,7 @@ pub fn isnan(args: &[ArrayRef]) -> Result { { f32::is_nan } )) as ArrayRef), - other => internal_err!("Unsupported data type {other:?} for function isnan"), + other => exec_err!("Unsupported data type {other:?} for function isnan"), } } @@ -351,14 +350,14 @@ pub fn iszero(args: &[ArrayRef]) -> Result { { |x: f32| { x == 0_f32 } } )) as ArrayRef), - other => internal_err!("Unsupported data type {other:?} for function iszero"), + other => exec_err!("Unsupported data type {other:?} for function iszero"), } } /// Pi SQL function pub fn pi(args: &[ColumnarValue]) -> Result { if !matches!(&args[0], ColumnarValue::Array(_)) { - return internal_err!("Expect pi function to take no param"); + return exec_err!("Expect pi function to take no param"); } let array = Float64Array::from_value(std::f64::consts::PI, 1); Ok(ColumnarValue::Array(Arc::new(array))) @@ -368,7 +367,7 @@ pub fn pi(args: &[ColumnarValue]) -> Result { pub fn random(args: &[ColumnarValue]) -> Result { let len: usize = match &args[0] { ColumnarValue::Array(array) => array.len(), - _ => return internal_err!("Expect random function to take no param"), + _ => return exec_err!("Expect random function to take no param"), }; let mut rng = thread_rng(); let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len); @@ -379,7 +378,7 @@ pub fn random(args: &[ColumnarValue]) -> Result { /// Round SQL function pub fn round(args: &[ArrayRef]) -> Result { if args.len() != 1 && args.len() != 2 { - return internal_err!( + return exec_err!( "round function requires one or two arguments, got {}", args.len() ); @@ -423,9 +422,9 @@ pub fn round(args: &[ArrayRef]) -> Result { } } )) as ArrayRef), - _ => internal_err!( - "round function requires a scalar or array for decimal_places" - ), + _ => { + exec_err!("round function requires a scalar or array for decimal_places") + } }, DataType::Float32 => match decimal_places { @@ -459,12 +458,12 @@ pub fn round(args: &[ArrayRef]) -> Result { } } )) as ArrayRef), - _ => internal_err!( - "round function requires a scalar or array for decimal_places" - ), + _ => { + exec_err!("round function requires a scalar or array for decimal_places") + } }, - other => internal_err!("Unsupported data type {other:?} for function round"), + other => exec_err!("Unsupported data type {other:?} for function round"), } } @@ -489,7 +488,7 @@ pub fn power(args: &[ArrayRef]) -> Result { { i64::pow } )) as ArrayRef), - other => internal_err!("Unsupported data type {other:?} for function power"), + other => exec_err!("Unsupported data type {other:?} for function power"), } } @@ -514,7 +513,7 @@ pub fn atan2(args: &[ArrayRef]) -> Result { { f32::atan2 } )) as ArrayRef), - other => internal_err!("Unsupported data type {other:?} for function atan2"), + other => exec_err!("Unsupported data type {other:?} for function atan2"), } } @@ -547,7 +546,7 @@ pub fn log(args: &[ArrayRef]) -> Result { Float64Array, { f64::log } )) as ArrayRef), - _ => internal_err!("log function requires a scalar or array for base"), + _ => exec_err!("log function requires a scalar or array for base"), }, DataType::Float32 => match base { @@ -565,10 +564,10 @@ pub fn log(args: &[ArrayRef]) -> Result { Float32Array, { f32::log } )) as ArrayRef), - _ => internal_err!("log function requires a scalar or array for base"), + _ => exec_err!("log function requires a scalar or array for base"), }, - other => internal_err!("Unsupported data type {other:?} for function log"), + other => exec_err!("Unsupported data type {other:?} for function log"), } } @@ -589,7 +588,7 @@ pub fn cot(args: &[ArrayRef]) -> Result { { compute_cot32 } )) as ArrayRef), - other => internal_err!("Unsupported data type {other:?} for function cot"), + other => exec_err!("Unsupported data type {other:?} for function cot"), } } @@ -606,7 +605,7 @@ fn compute_cot64(x: f64) -> f64 { /// Truncate(numeric, decimalPrecision) and trunc(numeric) SQL function pub fn trunc(args: &[ArrayRef]) -> Result { if args.len() != 1 && args.len() != 2 { - return internal_err!( + return exec_err!( "truncate function requires one or two arguments, got {}", args.len() ); @@ -635,7 +634,7 @@ pub fn trunc(args: &[ArrayRef]) -> Result { Int64Array, { compute_truncate64 } )) as ArrayRef), - _ => internal_err!("trunc function requires a scalar or array for precision"), + _ => exec_err!("trunc function requires a scalar or array for precision"), }, DataType::Float32 => match precision { ColumnarValue::Scalar(Int64(Some(0))) => Ok(Arc::new( @@ -650,9 +649,9 @@ pub fn trunc(args: &[ArrayRef]) -> Result { Int64Array, { compute_truncate32 } )) as ArrayRef), - _ => internal_err!("trunc function requires a scalar or array for precision"), + _ => exec_err!("trunc function requires a scalar or array for precision"), }, - other => internal_err!("Unsupported data type {other:?} for function trunc"), + other => exec_err!("Unsupported data type {other:?} for function trunc"), } } @@ -668,13 +667,14 @@ fn compute_truncate64(x: f64, y: i64) -> f64 { #[cfg(test)] mod tests { - - use super::*; use arrow::array::{Float64Array, NullArray}; + use datafusion_common::cast::{ as_boolean_array, as_float32_array, as_float64_array, as_int64_array, }; + use super::*; + #[test] fn test_random_expression() { let args = vec![ColumnarValue::Array(Arc::new(NullArray::new(1)))]; diff --git a/datafusion/physical-expr/src/regex_expressions.rs b/datafusion/physical-expr/src/regex_expressions.rs index b1334854ba0b..846e5801af1c 100644 --- a/datafusion/physical-expr/src/regex_expressions.rs +++ b/datafusion/physical-expr/src/regex_expressions.rs @@ -21,19 +21,18 @@ //! Regex expressions +use std::sync::{Arc, OnceLock}; + use arrow::array::{ new_null_array, Array, ArrayDataBuilder, ArrayRef, BufferBuilder, GenericStringArray, OffsetSizeTrait, }; +use hashbrown::HashMap; +use regex::Regex; use datafusion_common::{arrow_datafusion_err, exec_err, plan_err}; -use datafusion_common::{ - cast::as_generic_string_array, internal_err, DataFusionError, Result, -}; +use datafusion_common::{cast::as_generic_string_array, DataFusionError, Result}; use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation}; -use hashbrown::HashMap; -use regex::Regex; -use std::sync::{Arc, OnceLock}; use crate::functions::{ make_scalar_function_inner, make_scalar_function_with_hints, Hint, @@ -188,7 +187,7 @@ pub fn regexp_match(args: &[ArrayRef]) -> Result { arrow_string::regexp::regexp_match(values, regex, Some(flags)) .map_err(|e| arrow_datafusion_err!(e)) } - other => internal_err!( + other => exec_err!( "regexp_match was called with {other} arguments. It requires at least 2 and at most 3." ), } @@ -341,7 +340,7 @@ pub fn regexp_replace(args: &[ArrayRef]) -> Result Ok(Arc::new(result) as ArrayRef) } - other => internal_err!( + other => exec_err!( "regexp_replace was called with {other} arguments. It requires at least 3 and at most 4." ), } @@ -374,7 +373,7 @@ fn _regexp_replace_static_pattern_replace( 3 => None, 4 => Some(fetch_string_arg!(&args[3], "flags", T, _regexp_replace_early_abort)), other => { - return internal_err!( + return exec_err!( "regexp_replace was called with {other} arguments. It requires at least 3 and at most 4." ) } diff --git a/datafusion/physical-expr/src/string_expressions.rs b/datafusion/physical-expr/src/string_expressions.rs index 34a436ebe3cd..6a4a29763e4b 100644 --- a/datafusion/physical-expr/src/string_expressions.rs +++ b/datafusion/physical-expr/src/string_expressions.rs @@ -21,6 +21,12 @@ //! String expressions +use std::sync::Arc; +use std::{ + fmt::{Display, Formatter}, + iter, +}; + use arrow::{ array::{ Array, ArrayRef, GenericStringArray, Int32Array, Int64Array, OffsetSizeTrait, @@ -28,6 +34,8 @@ use arrow::{ }, datatypes::{ArrowNativeType, ArrowPrimitiveType, DataType}, }; +use uuid::Uuid; + use datafusion_common::utils::datafusion_strsim; use datafusion_common::{ cast::{ @@ -35,14 +43,8 @@ use datafusion_common::{ }, exec_err, ScalarValue, }; -use datafusion_common::{internal_err, DataFusionError, Result}; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; -use std::sync::Arc; -use std::{ - fmt::{Display, Formatter}, - iter, -}; -use uuid::Uuid; /// applies a unary expression to `args[0]` that is expected to be downcastable to /// a `GenericStringArray` and returns a `GenericStringArray` (which may have a different offset) @@ -62,7 +64,7 @@ where F: Fn(&'a str) -> R, { if args.len() != 1 { - return internal_err!( + return exec_err!( "{:?} args were supplied but {} takes exactly one argument", args.len(), name @@ -102,7 +104,7 @@ where &[a.as_ref()], op, name )?))) } - other => internal_err!("Unsupported data type {other:?} for function {name}"), + other => exec_err!("Unsupported data type {other:?} for function {name}"), }, ColumnarValue::Scalar(scalar) => match scalar { ScalarValue::Utf8(a) => { @@ -113,7 +115,7 @@ where let result = a.as_ref().map(|x| (op)(x).as_ref().to_string()); Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(result))) } - other => internal_err!("Unsupported data type {other:?} for function {name}"), + other => exec_err!("Unsupported data type {other:?} for function {name}"), }, } } @@ -170,7 +172,7 @@ pub fn chr(args: &[ArrayRef]) -> Result { pub fn concat(args: &[ColumnarValue]) -> Result { // do not accept 0 arguments. if args.is_empty() { - return internal_err!( + return exec_err!( "concat was called with {} arguments. It requires at least 1.", args.len() ); @@ -236,7 +238,7 @@ pub fn concat_ws(args: &[ArrayRef]) -> Result { // do not accept 0 or 1 arguments. if args.len() < 2 { - return internal_err!( + return exec_err!( "concat_ws was called with {} arguments. It requires at least 2.", args.len() ); @@ -333,7 +335,7 @@ pub fn instr(args: &[ArrayRef]) -> Result { Ok(Arc::new(result) as ArrayRef) } other => { - internal_err!( + exec_err!( "instr was called with {other} datatype arguments. It requires Utf8 or LargeUtf8." ) } @@ -410,7 +412,7 @@ fn general_trim( Ok(Arc::new(result) as ArrayRef) } other => { - internal_err!( + exec_err!( "{trim_type} was called with {other} arguments. It requires at least 1 and at most 2." ) } @@ -541,7 +543,7 @@ where } else if let Some(value_isize) = value.to_isize() { Ok(Some(format!("{value_isize:x}"))) } else { - internal_err!("Unsupported data type {integer:?} for function to_hex") + exec_err!("Unsupported data type {integer:?} for function to_hex") } } else { Ok(None) @@ -563,7 +565,7 @@ pub fn upper(args: &[ColumnarValue]) -> Result { pub fn uuid(args: &[ColumnarValue]) -> Result { let len: usize = match &args[0] { ColumnarValue::Array(array) => array.len(), - _ => return internal_err!("Expect uuid function to take no param"), + _ => return exec_err!("Expect uuid function to take no param"), }; let values = iter::repeat_with(|| Uuid::new_v4().to_string()).take(len); @@ -654,9 +656,7 @@ pub fn overlay(args: &[ArrayRef]) -> Result { Ok(Arc::new(result) as ArrayRef) } other => { - internal_err!( - "overlay was called with {other} arguments. It requires 3 or 4." - ) + exec_err!("overlay was called with {other} arguments. It requires 3 or 4.") } } } @@ -665,10 +665,10 @@ pub fn overlay(args: &[ArrayRef]) -> Result { /// LEVENSHTEIN('kitten', 'sitting') = 3 pub fn levenshtein(args: &[ArrayRef]) -> Result { if args.len() != 2 { - return Err(DataFusionError::Internal(format!( + return exec_err!( "levenshtein function requires two arguments, got {}", args.len() - ))); + ); } let str1_array = as_generic_string_array::(&args[0])?; let str2_array = as_generic_string_array::(&args[1])?; @@ -700,7 +700,7 @@ pub fn levenshtein(args: &[ArrayRef]) -> Result { Ok(Arc::new(result) as ArrayRef) } other => { - internal_err!( + exec_err!( "levenshtein was called with {other} datatype arguments. It requires Utf8 or LargeUtf8." ) } @@ -709,12 +709,13 @@ pub fn levenshtein(args: &[ArrayRef]) -> Result { #[cfg(test)] mod tests { - - use crate::string_expressions; use arrow::{array::Int32Array, datatypes::Int32Type}; use arrow_array::Int64Array; + use datafusion_common::cast::as_int32_array; + use crate::string_expressions; + use super::*; #[test] diff --git a/datafusion/physical-expr/src/unicode_expressions.rs b/datafusion/physical-expr/src/unicode_expressions.rs index 240efe4223c3..3209a6176fad 100644 --- a/datafusion/physical-expr/src/unicode_expressions.rs +++ b/datafusion/physical-expr/src/unicode_expressions.rs @@ -21,18 +21,20 @@ //! Unicode expressions +use std::cmp::{max, Ordering}; +use std::sync::Arc; + use arrow::{ array::{ArrayRef, GenericStringArray, OffsetSizeTrait, PrimitiveArray}, datatypes::{ArrowNativeType, ArrowPrimitiveType}, }; +use hashbrown::HashMap; +use unicode_segmentation::UnicodeSegmentation; + use datafusion_common::{ cast::{as_generic_string_array, as_int64_array}, - exec_err, internal_err, DataFusionError, Result, + exec_err, DataFusionError, Result, }; -use hashbrown::HashMap; -use std::cmp::{max, Ordering}; -use std::sync::Arc; -use unicode_segmentation::UnicodeSegmentation; /// Returns number of characters in the string. /// character_length('josé') = 4 @@ -312,7 +314,7 @@ pub fn rpad(args: &[ArrayRef]) -> Result { Ok(Arc::new(result) as ArrayRef) } - other => internal_err!( + other => exec_err!( "rpad was called with {other} arguments. It requires at least 2 and at most 3." ), } @@ -407,7 +409,7 @@ pub fn substr(args: &[ArrayRef]) -> Result { Ok(Arc::new(result) as ArrayRef) } other => { - internal_err!("substr was called with {other} arguments. It requires 2 or 3.") + exec_err!("substr was called with {other} arguments. It requires 2 or 3.") } } } @@ -463,7 +465,7 @@ pub fn translate(args: &[ArrayRef]) -> Result { /// SUBSTRING_INDEX('www.apache.org', '.', -1) = org pub fn substr_index(args: &[ArrayRef]) -> Result { if args.len() != 3 { - return internal_err!( + return exec_err!( "substr_index was called with {} arguments. It requires 3.", args.len() ); @@ -528,7 +530,7 @@ where T::Native: OffsetSizeTrait, { if args.len() != 2 { - return internal_err!( + return exec_err!( "find_in_set was called with {} arguments. It requires 2.", args.len() );